You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/26 03:20:39 UTC
[3/3] hive git commit: HIVE-17205 - add functional support for
unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6be50b76
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6be50b76
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6be50b76
Branch: refs/heads/master
Commit: 6be50b76be5956b3c52ed6024fd7b4a3dee65fb6
Parents: 262d8f9
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 25 20:14:57 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 25 20:15:26 2017 -0700
----------------------------------------------------------------------
.../streaming/AbstractRecordWriter.java | 44 +-
.../hive/hcatalog/streaming/HiveEndPoint.java | 4 +-
.../apache/hive/hcatalog/streaming/package.html | 21 +-
.../hive/hcatalog/streaming/TestStreaming.java | 86 +-
.../hive/metastore/TestHiveMetaStore.java | 8 +-
.../apache/hadoop/hive/ql/TestAcidOnTez.java | 367 +++-
.../hive/ql/TestAcidOnTezWithSplitUpdate.java | 28 -
.../test/resources/testconfiguration.properties | 4 +-
.../TransactionalValidationListener.java | 42 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 4 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 22 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 60 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 225 +-
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 14 +-
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 10 +-
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 26 +-
.../optimizer/SortedDynPartitionOptimizer.java | 1 +
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 6 -
.../hive/ql/txn/compactor/CompactorMR.java | 3 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 108 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 12 +-
.../hadoop/hive/ql/TestTxnCommandsBase.java | 162 ++
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 297 +++
.../hive/ql/io/orc/TestInputOutputFormat.java | 28 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 86 +-
.../queries/clientnegative/create_not_acid.q | 2 +-
.../queries/clientpositive/acid_no_buckets.q | 210 ++
.../clientnegative/create_not_acid.q.out | 4 +-
.../clientnegative/delete_non_acid_table.q.out | 2 +-
.../clientnegative/update_non_acid_table.q.out | 2 +-
.../clientpositive/llap/acid_no_buckets.q.out | 1976 ++++++++++++++++++
31 files changed, 3534 insertions(+), 330 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index e409e75..4ec10ad 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -46,6 +46,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
@@ -54,19 +55,23 @@ import java.util.Properties;
public abstract class AbstractRecordWriter implements RecordWriter {
static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
- final HiveConf conf;
- final HiveEndPoint endPoint;
+ private final HiveConf conf;
+ private final HiveEndPoint endPoint;
final Table tbl;
- final IMetaStoreClient msClient;
- protected final List<Integer> bucketIds;
- ArrayList<RecordUpdater> updaters = null;
+ private final IMetaStoreClient msClient;
+ final List<Integer> bucketIds;
+ private ArrayList<RecordUpdater> updaters = null;
- public final int totalBuckets;
+ private final int totalBuckets;
+ /**
+ * Indicates whether target table is bucketed
+ */
+ private final boolean isBucketed;
private final Path partitionPath;
- final AcidOutputFormat<?,?> outf;
+ private final AcidOutputFormat<?,?> outf;
private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
private Long curBatchMinTxnId;
private Long curBatchMaxTxnId;
@@ -109,16 +114,22 @@ public abstract class AbstractRecordWriter implements RecordWriter {
this.tbl = twp.tbl;
this.partitionPath = twp.partitionPath;
}
- this.totalBuckets = tbl.getSd().getNumBuckets();
- if (totalBuckets <= 0) {
- throw new StreamingException("Cannot stream to table that has not been bucketed : "
- + endPoint);
+ this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+ /**
+ * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+ * ends up writing to a file bucket_000000
+ * See also {@link #getBucket(Object)}
+ */
+ this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+ if(isBucketed) {
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+ this.bucketFieldData = new Object[bucketIds.size()];
+ }
+ else {
+ bucketIds = Collections.emptyList();
}
- this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
- this.bucketFieldData = new Object[bucketIds.size()];
String outFormatName = this.tbl.getSd().getOutputFormat();
outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
- bucketFieldData = new Object[bucketIds.size()];
} catch(InterruptedException e) {
throw new StreamingException(endPoint2.toString(), e);
} catch (MetaException | NoSuchObjectException e) {
@@ -169,6 +180,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
// returns the bucket number to which the record belongs to
protected int getBucket(Object row) throws SerializationError {
+ if(!isBucketed) {
+ return 0;
+ }
ObjectInspector[] inspectors = getBucketObjectInspectors();
Object[] bucketFields = getBucketFields(row);
return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
@@ -204,7 +218,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
curBatchMaxTxnId = maxTxnID;
updaters = new ArrayList<RecordUpdater>(totalBuckets);
for (int bucket = 0; bucket < totalBuckets; bucket++) {
- updaters.add(bucket, null);
+ updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 81f6155..28c98bd 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.cli.CliSessionState;
@@ -338,7 +340,7 @@ public class HiveEndPoint {
// 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
Map<String, String> params = t.getParameters();
if (params != null) {
- String transactionalProp = params.get("transactional");
+ String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
LOG.error("'transactional' property is not set on Table " + endPoint);
throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
index ed4d307..a879b97 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
@@ -30,7 +30,7 @@ partition. Once data is committed it becomes immediately visible to
all Hive queries initiated subsequently.</p>
<p>
-This API is intended for streaming clients such as Flume and Storm,
+This API is intended for streaming clients such as NiFi, Flume and Storm,
which continuously generate data. Streaming support is built on top of
ACID based insert/update support in Hive.</p>
@@ -56,10 +56,7 @@ A few things are currently required to use streaming.
<ol>
<li> Currently, only ORC storage format is supported. So
'<b>stored as orc</b>' must be specified during table creation.</li>
- <li> The hive table must be bucketed, but not sorted. So something like
- '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must
- be specified during table creation. The number of buckets
- is ideally the same as the number of streaming writers.</li>
+ <li> The hive table may be bucketed but must not be sorted. </li>
<li> User of the client streaming process must have the necessary
permissions to write to the table or partition and create partitions in
the table.</li>
@@ -67,7 +64,6 @@ A few things are currently required to use streaming.
<ol>
<li><b>hive.input.format =
org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
- <li><b>hive.vectorized.execution.enabled = false</b></li>
</ol></li>
The above client settings are a temporary requirement and the intention is to
drop the need for them in the near future.
@@ -165,8 +161,21 @@ additional implementations of the <b>RecordWriter</b> interface.
- Delimited text input.</li>
<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a>
- JSON text input.</li>
+ <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+ - text input with regex.</li>
</ul></p>
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+ Each StreamingConnection is writing data at the rate the underlying
+ FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can
+ be created concurrently.
+</p>
+<p>
+ Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+ may have at most 2 threads operaing on it.
+ See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
</body>
</html>
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index f3ef92b..49520ef 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -193,7 +197,6 @@ public class TestStreaming {
conf = new HiveConf(this.getClass());
conf.set("fs.raw.impl", RawFileSystem.class.getName());
- conf.set("hive.enforce.bucketing", "true");
conf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
@@ -339,6 +342,83 @@ public class TestStreaming {
}
}
+ /**
+ * Test that streaming can write to unbucketed table.
+ */
+ @Test
+ public void testNoBuckets() throws Exception {
+ queryTable(driver, "drop table if exists default.streamingnobuckets");
+ //todo: why does it need transactional_properties?
+ queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
+ queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
+ List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+ Assert.assertEquals(1, rs.size());
+ Assert.assertEquals("foo\tbar", rs.get(0));
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+ String[] colNames1 = new String[] { "a", "b" };
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("a1,b2".getBytes());
+ txnBatch.write("a3,b4".getBytes());
+ txnBatch.commit();
+ txnBatch.beginNextTransaction();
+ txnBatch.write("a5,b6".getBytes());
+ txnBatch.write("a7,b8".getBytes());
+ txnBatch.commit();
+ txnBatch.close();
+
+ Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+ Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+ Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+
+ queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
+ queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+ rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
+ int row = 0;
+ Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+ Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+ queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
+ runWorker(conf);
+ rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+ }
+
+ /**
+ * this is a clone from TestTxnStatement2....
+ */
+ public static void runWorker(HiveConf hiveConf) throws MetaException {
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
// stream data into streaming table with N buckets, then copy the data into another bucketed table
// check if bucketing in both was done in the same way
@@ -453,8 +533,8 @@ public class TestStreaming {
}
/**
- * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
- * little value in using InputFormat directly
+ * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
+ * there is little value in using InputFormat directly
*/
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 8bd23cc..50e5274 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -3018,7 +3018,7 @@ public abstract class TestHiveMetaStore extends TestCase {
Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
Assert.assertTrue("Expected exception", false);
} catch (MetaException e) {
- Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+ Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
}
// Fail - "transactional" is set to true, and the table is bucketed, but doesn't use ORC
@@ -3031,7 +3031,7 @@ public abstract class TestHiveMetaStore extends TestCase {
Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
Assert.assertTrue("Expected exception", false);
} catch (MetaException e) {
- Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+ Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
}
// Succeed - "transactional" is set to true, and the table is bucketed, and uses ORC
@@ -3064,13 +3064,14 @@ public abstract class TestHiveMetaStore extends TestCase {
tblName += "1";
params.clear();
sd.unsetBucketCols();
+ sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
t = createTable(dbName, tblName, owner, params, null, sd, 0);
params.put("transactional", "true");
t.setParameters(params);
client.alter_table(dbName, tblName, t);
Assert.assertTrue("Expected exception", false);
} catch (MetaException e) {
- Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+ Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
}
// Succeed - trying to set "transactional" to "true", and satisfies bucketing and Input/OutputFormat requirement
@@ -3078,6 +3079,7 @@ public abstract class TestHiveMetaStore extends TestCase {
params.clear();
sd.setNumBuckets(1);
sd.setBucketCols(bucketCols);
+ sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
t = createTable(dbName, tblName, owner, params, null, sd, 0);
params.put("transactional", "true");
t.setParameters(params);
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 2bf9871..d0b5cf6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -22,7 +22,10 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -30,23 +33,26 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class resides in itests to facilitate running query using Tez engine, since the jars are
* fully loaded here, which is not the case if it stays in ql.
*/
public class TestAcidOnTez {
+ static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestAcidOnTez.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
@@ -61,8 +67,10 @@ public class TestAcidOnTez {
private static enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
+ ACIDNOBUCKET("acidNoBucket"),
NONACIDORCTBL("nonAcidOrcTbl"),
- NONACIDPART("nonAcidPart");
+ NONACIDPART("nonAcidPart"),
+ NONACIDNONBUCKET("nonAcidNonBucket");
private final String name;
@Override
@@ -159,6 +167,359 @@ public class TestAcidOnTez {
testJoin("tez", "MapJoin");
}
+ /**
+ * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+ * where "original" files are not immediate children of the partition dir
+ */
+ @Test
+ public void testNonStandardConversion01() throws Exception {
+ HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+ setupTez(confForTez);
+ //CTAS with non-ACID target table
+ runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " +
+ "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+
+ List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME");
+ String expected0[][] = {
+ {"1\t2", "/1/000000_0"},
+ {"3\t4", "/1/000000_0"},
+ {"5\t6", "/1/000000_0"},
+ {"5\t6", "/2/000000_0"},
+ {"7\t8", "/2/000000_0"},
+ {"9\t10", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected0.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+ }
+ //make the table ACID
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')");
+
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after ctas:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ /*
+ * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/
+ String expected[][] = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ //perform some Update/Delete
+ runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7");
+ runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5");
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after update/delete:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ String[][] expected2 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"}
+ };
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //now make sure delete deltas are present
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+ expectedDelDelta[i] = null;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+ }
+ //run Minor compaction
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after compact minor:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify the data is the same
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+ //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //check we have right delete delta files after minor compaction
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+ expectedDelDelta2[i] = null;
+ break;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+ }
+ //run Major compaction
+ runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+ LOG.warn("after compact major:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //everything is now in base/
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000"));
+ }
+ }
+ /**
+ * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+ * where "original" files are not immediate children of the partition dir - partitioned table
+ *
+ * How to do this? CTAS is the only way to create data files which are not immediate children
+ * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy
+ * data files in directly.
+ */
+ @Ignore("HIVE-17214")
+ @Test
+ public void testNonStandardConversion02() throws Exception {
+ HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+ confForTez.setBoolean("mapred.input.dir.recursive", true);
+ setupTez(confForTez);
+ runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC " +
+ "TBLPROPERTIES('transactional'='false') as " +
+ "select a, b from " + Table.ACIDTBL + " where a <= 3 union all " +
+ "select a, b from " + Table.NONACIDORCTBL + " where a >= 7 " +
+ "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez);
+
+ List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " +
+ Table.NONACIDNONBUCKET + " order by a, b");
+ String expected0[][] = {
+ {"1\t2", "/1/000000_0"},
+ {"3\t4", "/1/000000_0"},
+ {"5\t6", "/3/000000_0"},
+ {"7\t8", "/2/000000_0"},
+ {"9\t10", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected0.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+ }
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ //ensure there is partition dir
+ runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)");
+ //creates more files in that partition
+ for(FileStatus stat : status) {
+ int limit = 5;
+ Path p = stat.getPath();//dirs 1/, 2/, 3/
+ Path to = new Path(TEST_WAREHOUSE_DIR + "/" + Table.NONACIDPART+ "/p=1/" + p.getName());
+ while(limit-- > 0 && !fs.rename(p, to)) {
+ Thread.sleep(200);
+ }
+ if(limit <= 0) {
+ throw new IllegalStateException("Could not rename " + p + " to " + to);
+ }
+ }
+ /*
+ This is what we expect on disk
+ ekoifman:warehouse ekoifman$ tree nonacidpart/
+ nonacidpart/
+ └── p=1
+ ├── 000000_0
+ ├── 1
+ │ └── 000000_0
+ ├── 2
+ │ └── 000000_0
+ └── 3
+ └── 000000_0
+
+4 directories, 4 files
+ **/
+ //make the table ACID
+ runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')");
+ rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+ LOG.warn("after acid conversion:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ String[][] expected = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"}
+ };
+ Assert.assertEquals("Wrong row count", expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+
+ //run Major compaction
+ runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+ LOG.warn("after major compaction:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " ac: " +
+ rs.get(i), rs.get(i).endsWith("nonacidpart/p=1/base_-9223372036854775808/bucket_00000"));
+ }
+
+ }
+ /**
+ * CTAS + Tez + Union creates a non-standard layout in table dir
+ * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc
+ * The way this currently works is that CTAS creates an Acid table but the insert statement writes
+ * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion.
+ * Longer term CTAS should create acid layout from the get-go.
+ */
+ @Test
+ public void testCtasTezUnion() throws Exception {
+ HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+ setupTez(confForTez);
+ //CTAS with ACID target table
+ runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " +
+ "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+ List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after ctas:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ /*
+ * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/
+ String expected[][] = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+ };
+ Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ //perform some Update/Delete
+ runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7");
+ runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5");
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after update/delete:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ String[][] expected2 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"}
+ };
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //now make sure delete deltas are present
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+ expectedDelDelta[i] = null;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+ }
+ //run Minor compaction
+ runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after compact minor:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+ //verify the data is the same
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+ //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+ }
+ //check we have right delete delta files after minor compaction
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"};
+ for(FileStatus stat : status) {
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+ expectedDelDelta2[i] = null;
+ break;
+ }
+ }
+ }
+ for(int i = 0; i < expectedDelDelta2.length; i++) {
+ Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+ }
+ //run Major compaction
+ runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+ LOG.warn("after compact major:");
+ for (String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+ for(int i = 0; i < expected2.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+ //everything is now in base/
+ Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000"));
+ }
+ }
// Ideally test like this should be a qfile test. However, the explain output from qfile is always
// slightly different depending on where the test is run, specifically due to file size estimation
private void testJoin(String engine, String joinType) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
deleted file mode 100644
index 3dacf08..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql;
-
-/**
- * Same as parent class but covers Acid 2.0 tables
- */
-public class TestAcidOnTezWithSplitUpdate extends TestAcidOnTez {
- @Override
- String getTblProperties() {
- return "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')";
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 37a3757..fa6a2aa 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -456,7 +456,9 @@ minillap.query.files=acid_bucket_pruning.q,\
llap_stats.q,\
multi_count_distinct_null.q
-minillaplocal.query.files=acid_globallimit.q,\
+minillaplocal.query.files=\
+ acid_no_buckets.q, \
+ acid_globallimit.q,\
acid_vectorization_missing_cols.q,\
alter_merge_stats_orc.q,\
auto_join30.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 023d703..3a3d184 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -98,14 +98,26 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
// that will use it down below.
}
}
+ Table oldTable = context.getOldTable();
+ String oldTransactionalValue = null;
+ String oldTransactionalPropertiesValue = null;
+ for (String key : oldTable.getParameters().keySet()) {
+ if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
+ oldTransactionalValue = oldTable.getParameters().get(key);
+ }
+ if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+ oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+ }
+ }
+
if (transactionalValuePresent) {
//normalize prop name
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
}
- if ("true".equalsIgnoreCase(transactionalValue)) {
+ if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
+ //only need to check conformance if alter table enabled aicd
if (!conformToAcid(newTable)) {
- throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
- " format (such as ORC)");
+ throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
}
if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -115,17 +127,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
hasValidTransactionalValue = true;
}
- Table oldTable = context.getOldTable();
- String oldTransactionalValue = null;
- String oldTransactionalPropertiesValue = null;
- for (String key : oldTable.getParameters().keySet()) {
- if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
- oldTransactionalValue = oldTable.getParameters().get(key);
- }
- if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
- oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
- }
- }
if (oldTransactionalValue == null ? transactionalValue == null
@@ -195,8 +196,7 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
if ("true".equalsIgnoreCase(transactionalValue)) {
if (!conformToAcid(newTable)) {
- throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
- " format (such as ORC)");
+ throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
}
if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -214,14 +214,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'");
}
- // Check if table is bucketed and InputFormatClass/OutputFormatClass should implement
- // AcidInputFormat/AcidOutputFormat
+ /**
+ * Check that InputFormatClass/OutputFormatClass should implement
+ * AcidInputFormat/AcidOutputFormat
+ */
private boolean conformToAcid(Table table) throws MetaException {
StorageDescriptor sd = table.getSd();
- if (sd.getBucketColsSize() < 1) {
- return false;
- }
-
try {
Class inputFormatClass = Class.forName(sd.getInputFormat());
Class outputFormatClass = Class.forName(sd.getOutputFormat());
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9c9d4e7..b3ef916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -418,8 +418,8 @@ public enum ErrorMsg {
" does not support these operations."),
VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
"Values clause with table constructor not yet supported"),
- ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " +
- "an AcidOutputFormat or is not bucketed", true),
+ ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that is " +
+ "not transactional", true),
ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " +
"sorted, table {0}", true),
ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299,
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 25ad1e9..bc265eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
@@ -285,6 +287,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
private transient int numFiles;
protected transient boolean multiFileSpray;
protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+ private transient boolean isBucketed = false;
private transient ObjectInspector[] partitionObjectInspectors;
protected transient HivePartitioner<HiveKey, Object> prtner;
@@ -345,6 +348,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
isNativeTable = !conf.getTableInfo().isNonNative();
isTemporary = conf.isTemporary();
multiFileSpray = conf.isMultiFileSpray();
+ this.isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
dpCtx = conf.getDynPartCtx();
@@ -791,9 +795,23 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
* Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The
* reset of acid (read path) doesn't know how to handle copy_N files except for 'original'
* files (HIVE-16177)*/
+ int writerId = -1;
+ if(!isBucketed) {
+ assert !multiFileSpray;
+ assert writerOffset == 0;
+ /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can
+ * be written to the same bucketN file.
+ * N in this case is writerId and there is no relationship
+ * between the file name and any property of the data in it. Inserts will be written
+ * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed
+ * contain writerId=N.
+ * Since taskId is unique (at least per statementId and thus
+ * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/
+ writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
+ }
fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
- jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset],
- rowInspector, reporter, 0);
+ jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf,
+ fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
fpaths.outPaths[writerOffset]);
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 751fca8..69a9f9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -964,6 +964,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final boolean allowSyntheticFileIds;
private final boolean isDefaultFs;
+ /**
+ * @param dir - root of partition dir
+ */
public BISplitStrategy(Context context, FileSystem fs, Path dir,
List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas,
boolean[] covered, boolean allowSyntheticFileIds, boolean isDefaultFs) {
@@ -996,7 +999,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
- deltas, -1, logicalLen);
+ deltas, -1, logicalLen, dir);
splits.add(orcSplit);
}
}
@@ -1017,18 +1020,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* ACID split strategy is used when there is no base directory (when transactions are enabled).
*/
static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
- private Path dir;
+ Path dir;
private List<DeltaMetaData> deltas;
- private boolean[] covered;
- private int numBuckets;
private AcidOperationalProperties acidOperationalProperties;
-
+ /**
+ * @param dir root of partition dir
+ */
ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
AcidOperationalProperties acidOperationalProperties) {
this.dir = dir;
- this.numBuckets = numBuckets;
this.deltas = deltas;
- this.covered = covered;
this.acidOperationalProperties = acidOperationalProperties;
}
@@ -1234,6 +1235,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final UserGroupInformation ugi;
private final boolean allowSyntheticFileIds;
private SchemaEvolution evolution;
+ //this is the root of the partition in which the 'file' is located
+ private final Path rootDir;
public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
@@ -1250,6 +1253,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.isOriginal = splitInfo.isOriginal;
this.deltas = splitInfo.deltas;
this.hasBase = splitInfo.hasBase;
+ this.rootDir = splitInfo.dir;
this.projColsUncompressedSize = -1;
this.deltaSplits = splitInfo.getSplits();
this.allowSyntheticFileIds = allowSyntheticFileIds;
@@ -1361,7 +1365,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
fileKey = new SyntheticFileId(file);
}
return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
- orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen);
+ orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen, rootDir);
}
private static final class OffsetAndLength { // Java cruft; pair of long.
@@ -1641,7 +1645,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
pathFutures.add(ecs.submit(fileGenerator));
}
- boolean isTransactionalTableScan =
+ boolean isTransactionalTableScan =//this never seems to be set correctly
HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
TypeDescription readerSchema =
@@ -1932,16 +1936,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final OrcSplit split = (OrcSplit) inputSplit;
final Path path = split.getPath();
-
Path root;
if (split.hasBase()) {
if (split.isOriginal()) {
- root = path.getParent();
+ root = split.getRootDir();
} else {
root = path.getParent().getParent();
+ assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() +
+ " path.p.p=" + root;
}
- } else {//here path is a delta/ but above it's a partition/
- root = path;
+ } else {
+ throw new IllegalStateException("Split w/o base: " + path);
}
// Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
@@ -2037,21 +2042,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
};
}
- private static Path findOriginalBucket(FileSystem fs,
- Path directory,
- int bucket) throws IOException {
- for(FileStatus stat: fs.listStatus(directory)) {
- if(stat.getLen() <= 0) {
- continue;
- }
- AcidOutputFormat.Options bucketInfo =
- AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
- if(bucketInfo.getBucketId() == bucket) {
- return stat.getPath();
- }
- }
- throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory);
- }
static Reader.Options createOptionsForReader(Configuration conf) {
/**
@@ -2275,20 +2265,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
) throws IOException {
Reader reader = null;
boolean isOriginal = false;
- if (baseDirectory != null) {
- Path bucketFile;
+ if (baseDirectory != null) {//this is NULL for minor compaction
+ Path bucketFile = null;
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
} else {
+ /**we don't know which file to start reading -
+ * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
isOriginal = true;
- bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf),
- baseDirectory, bucket);
}
- reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+ if(bucketFile != null) {
+ reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+ }
}
OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
.isCompacting(true)
- .rootPath(baseDirectory);
+ .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 97c4e3d..cbbb4c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.BucketCodec;
@@ -29,7 +31,6 @@ import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.orc.OrcUtils;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.AcidStats;
import org.apache.orc.impl.OrcAcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +69,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final RecordIdentifier maxKey;
// an extra value so that we can return it while reading ahead
private OrcStruct extraValue;
-
/**
* A RecordIdentifier extended with the current transaction id. This is the
* key of our merge sort with the originalTransaction, bucket, and rowId
@@ -294,9 +294,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* Running multiple Insert statements on the same partition (of non acid table) creates files
* like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc. So the OriginalReaderPair must treat all
* of these files as part of a single logical bucket file.
+ *
+ * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed.
+ * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union. Thus the
+ * data file need not be an immediate child of partition dir. All files for a given writerId are
+ * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently.
*
* For Compaction, where each split includes the whole bucket, this means reading over all the
* files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.
+ * For unbucketed tables, a Compaction split is all files written by a given writerId.
*
* For a read after the table is marked transactional but before it's rewritten into a base/
* by compaction, each of the original files may be split into many pieces. For each split we
@@ -305,7 +311,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for
* the rows of the 'original' file of course, must be assigned from the beginning of logical
* bucket. The last split of the logical bucket, i.e. the split that has the end of last file,
- * should include all insert events from deltas.
+ * should include all insert events from deltas (last sentence is obsolete for Acid 2: HIVE-17320)
*/
private static abstract class OriginalReaderPair implements ReaderPair {
OrcStruct nextRecord;
@@ -407,18 +413,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
RecordIdentifier newMaxKey = maxKey;
recordReader = reader.rowsOptions(options);
/**
- * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't
- * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+ * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copy_N. etc We don't
+ * know N a priori so if this is true, then the current split is from 0000_0_copy_N file.
* It's needed to correctly set maxKey. In particular, set maxKey==null if this split
* is the tail of the last file for this logical bucket to include all deltas written after
- * non-acid to acid table conversion.
+ * non-acid to acid table conversion (todo: HIVE-17320).
+ * Also, see comments at {@link OriginalReaderPair} about unbucketed tables.
*/
- boolean isLastFileForThisBucket = false;
+ boolean isLastFileForThisBucket = true;
boolean haveSeenCurrentFile = false;
long rowIdOffsetTmp = 0;
- if (mergerOptions.getCopyIndex() > 0) {
+ {
//the split is from something other than the 1st file of the logical bucket - compute offset
-
AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
conf, validTxnList, false, true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
@@ -467,23 +473,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
}
}
- } else {
- rowIdOffset = 0;
- isLastFileForThisBucket = true;
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
- conf, validTxnList, false, true);
- int numFilesInBucket = 0;
- for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
- AcidOutputFormat.Options bucketOptions =
- AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
- if (bucketOptions.getBucketId() == bucketId) {
- numFilesInBucket++;
- if (numFilesInBucket > 1) {
- isLastFileForThisBucket = false;
- break;
- }
- }
- }
}
if (!isLastFileForThisBucket && maxKey == null) {
/*
@@ -651,6 +640,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
* Find the key range for original bucket files.
+ * For unbucketed tables the insert event data is still written to bucket_N file except that
+ * N is just a writer ID - it still matches {@link RecordIdentifier#getBucketProperty()}. For
+ * 'original' files (ubucketed) the same applies. A file 000000_0 encodes a taskId/wirterId and
+ * at read time we synthesize {@link RecordIdentifier#getBucketProperty()} to match the file name
+ * and so the same bucketProperty is used here to create minKey/maxKey, i.e. these keys are valid
+ * to filter data from delete_delta files even for unbucketed tables.
* @param reader the reader
* @param bucket the bucket number we are reading
* @param options the options for reading with
@@ -740,7 +735,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
static Reader.Options createEventOptions(Reader.Options options) {
Reader.Options result = options.clone();
- //result.range(options.getOffset(), Long.MAX_VALUE);WTF?
result.include(options.getInclude());
// slide the column names down by 6 for the name array
@@ -755,11 +749,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return result;
}
+ /**
+ * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
+ * This makes the "context" explicit.
+ */
static class Options {
private int copyIndex = 0;
private boolean isCompacting = false;
private Path bucketPath;
private Path rootPath;
+ private boolean isMajorCompaction = false;
+ private boolean isDeleteReader = false;
Options copyIndex(int copyIndex) {
assert copyIndex >= 0;
this.copyIndex = copyIndex;
@@ -767,6 +767,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
Options isCompacting(boolean isCompacting) {
this.isCompacting = isCompacting;
+ assert !isDeleteReader;
return this;
}
Options bucketPath(Path bucketPath) {
@@ -777,6 +778,16 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.rootPath = rootPath;
return this;
}
+ Options isMajorCompaction(boolean isMajor) {
+ this.isMajorCompaction = isMajor;
+ assert !isDeleteReader;
+ return this;
+ }
+ Options isDeleteReader(boolean isDeleteReader) {
+ this.isDeleteReader = isDeleteReader;
+ assert !isCompacting;
+ return this;
+ }
/**
* 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
*/
@@ -788,7 +799,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
* Full path to the data file
- * @return
*/
Path getBucketPath() {
return bucketPath;
@@ -797,6 +807,22 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* Partition folder (Table folder if not partitioned)
*/
Path getRootPath() { return rootPath; }
+ /**
+ * @return true if major compaction, false if minor
+ */
+ boolean isMajorCompaction() {
+ return isMajorCompaction && isCompacting;
+ }
+ boolean isMinorCompaction() {
+ return !isMajorCompaction && isCompacting;
+ }
+ /**
+ * true if this is only processing delete deltas to load in-memory table for
+ * vectorized reader
+ */
+ boolean isDeleteReader() {
+ return isDeleteReader;
+ }
}
/**
* Create a reader that merge sorts the ACID events together.
@@ -820,6 +846,39 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.offset = options.getOffset();
this.length = options.getLength();
this.validTxnList = validTxnList;
+ /**
+ * @since Hive 3.0
+ * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only
+ * has Delete events and the others only have Insert events. Thus {@link #baseReader} is
+ * a split of a file in base/ or delta/.
+ *
+ * For Compaction, each split (for now) is a logical bucket, i.e. all files from base/ + delta(s)/
+ * for a given bucket ID and delete_delta(s)/
+ *
+ * For bucketed tables, the data files are named bucket_N and all rows in this file are such
+ * that {@link org.apache.hadoop.hive.ql.io.BucketCodec#decodeWriterId(int)} of
+ * {@link RecordIdentifier#getBucketProperty()} is N. This is currently true for all types of
+ * files but may not be true for for delete_delta/ files in the future.
+ *
+ * For un-bucketed tables, the system is designed so that it works when there is no relationship
+ * between delete_delta file name (bucket_N) and the value of {@link RecordIdentifier#getBucketProperty()}.
+ * (Later we this maybe optimized to take advantage of situations where it is known that
+ * bucket_N matches bucketProperty().) This implies that for a given {@link baseReader} all
+ * files in delete_delta/ have to be opened ({@link ReaderPair} created). Insert events are
+ * still written such that N in file name (writerId) matches what's in bucketProperty().
+ *
+ * Compactor for un-bucketed tables works exactly the same as for bucketed ones though it
+ * should be optimized (see HIVE-17206). In particular, each split is a set of files
+ * created by a writer with the same writerId, i.e. all bucket_N files across base/ &
+ * deleta/ for the same N. Unlike bucketed tables, there is no relationship between
+ * any values in user columns to file name.
+ * The maximum N is determined by the number of writers the system chose for the the "largest"
+ * write into a given partition.
+ *
+ * In both cases, Compactor should be changed so that Minor compaction is run very often and
+ * only compacts delete_delta/. Major compaction can do what it does now.
+ */
+ boolean isBucketed = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
TypeDescription typeDescr =
OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
@@ -829,16 +888,26 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
// modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
- if (reader == null) {
+ if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
+ mergerOptions.isDeleteReader()) {
+ //for minor compaction, there is no progress report and we don't filter deltas
baseReader = null;
minKey = maxKey = null;
+ assert reader == null : "unexpected input reader during minor compaction: " +
+ mergerOptions.getRootPath();
} else {
KeyInterval keyInterval;
- // find the min/max based on the offset and length (and more for 'original')
- if (isOriginal) {
- keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+ if (mergerOptions.isCompacting()) {
+ assert mergerOptions.isMajorCompaction();
+ //compaction doesn't filter deltas but *may* have a reader for 'base'
+ keyInterval = new KeyInterval(null, null);
} else {
- keyInterval = discoverKeyBounds(reader, options);
+ // find the min/max based on the offset and length (and more for 'original')
+ if (isOriginal) {
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+ } else {
+ keyInterval = discoverKeyBounds(reader, options);
+ }
}
LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// use the min/max instead of the byte range
@@ -849,8 +918,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if(mergerOptions.isCompacting()) {
pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
conf, validTxnList);
- }
- else {
+ } else {
pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
}
@@ -868,35 +936,31 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
baseReader = pair.getRecordReader();
}
- // we always want to read all of the deltas
- eventOptions.range(0, Long.MAX_VALUE);
if (deltaDirectory != null) {
+ /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+ * user columns
+ * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+ Reader.Options deltaEventOptions = eventOptions.clone()
+ .searchArgument(null, null).range(0, Long.MAX_VALUE);
for(Path delta: deltaDirectory) {
if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
//all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
}
ReaderKey key = new ReaderKey();
- Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
- FileSystem fs = deltaFile.getFileSystem(conf);
- long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile);
- if (length != -1 && fs.exists(deltaFile)) {
- Reader deltaReader = OrcFile.createReader(deltaFile,
- OrcFile.readerOptions(conf).maxLength(length));
- Reader.Options deltaEventOptions = null;
- if(eventOptions.getSearchArgument() != null) {
- // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
- // it can produce wrong results (if the latest valid version of the record is filtered out by
- // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
- // unless the delta only has insert events
- AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader);
- if(acidStats.deletes > 0 || acidStats.updates > 0) {
- deltaEventOptions = eventOptions.clone().searchArgument(null, null);
- }
+ for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
+ FileSystem fs = deltaFile.getFileSystem(conf);
+ if(!fs.exists(deltaFile)) {
+ continue;
}
+ /* side files are only created by streaming ingest. If this is a compaction, we may
+ * have an insert delta/ here with side files there because the original writer died.*/
+ long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
+ assert length >= 0;
+ Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
- deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+ deltaEventOptions, deltaDir.getStatementId());
if (deltaPair.nextRecord() != null) {
readers.put(key, deltaPair);
}
@@ -921,6 +985,59 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
+ /**
+ * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
+ * For unbucketed tables {@code bucket} can be thought of as a write tranche.
+ */
+ static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf,
+ Options mergerOptions, boolean isBucketed) throws IOException {
+ if(isBucketed) {
+ /**
+ * for bucketed tables (for now) we always trust that the N in bucketN file name means that
+ * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N. This
+ * means that a delete event in bucketN can only modify an insert in another bucketN file for
+ * the same N. (Down the road we may trust it only in certain delta dirs)
+ *
+ * Compactor takes all types of deltas for a given bucket. For regular read, any file that
+ * contains (only) insert events is treated as base and only
+ * delete_delta/ are treated as deltas.
+ */
+ assert (!mergerOptions.isCompacting &&
+ deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+ ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory;
+ Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+ return new Path[]{deltaFile};
+ }
+ /**
+ * For unbucketed tables insert events are also stored in bucketN files but here N is
+ * the writer ID. We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()}
+ * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN.
+ * Thus we always have to take all files in a delete_delta.
+ * For regular read, any file that has (only) insert events is treated as base so
+ * {@link deltaDirectory} can only be delete_delta and so we take all files in it.
+ * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for
+ * a given N.
+ */
+ if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+ //it's not wrong to take all delete events for bucketed tables but it's more efficient
+ //to only take those that belong to the 'bucket' assuming we trust the file name
+ //un-bucketed table - get all files
+ FileSystem fs = deltaDirectory.getFileSystem(conf);
+ FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter);
+ Path[] deltaFiles = new Path[dataFiles.length];
+ int i = 0;
+ for (FileStatus stat : dataFiles) {
+ deltaFiles[i++] = stat.getPath();
+ }//todo: need a test where we actually have more than 1 file
+ return deltaFiles;
+ }
+ //if here it must be delta_x_y - insert events only, so we must be compacting
+ assert mergerOptions.isCompacting() : "Expected to be called as part of compaction";
+ Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+ return new Path[] {deltaFile};
+
+ }
+
@VisibleForTesting
RecordIdentifier getMinKey() {
return minKey;
http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 429960b..1e19a91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -243,7 +243,8 @@ public class OrcRecordUpdater implements RecordUpdater {
}
if (options.getMinimumTransactionId() != options.getMaximumTransactionId()
&& !options.isWritingBase()){
- flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
+ //throw if file already exists as that should never happen
+ flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8,
options.getReporter());
flushLengths.writeLong(0);
OrcInputFormat.SHIMS.hflush(flushLengths);
@@ -349,6 +350,12 @@ public class OrcRecordUpdater implements RecordUpdater {
return newInspector;
}
}
+
+ /**
+ * The INSERT event always uses {@link #bucket} that this {@link RecordUpdater} was created with
+ * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for
+ * late split
+ */
private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
throws IOException {
this.operation.set(operation);
@@ -394,6 +401,11 @@ public class OrcRecordUpdater implements RecordUpdater {
Integer currentBucket = null;
if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+ /**
+ * make sure bucketProperty in the delete event is from the {@link row} rather than whatever
+ * {@link this#bucket} is. For bucketed tables, the 2 must agree on bucketId encoded in it
+ * not for necessarily the whole value. For unbucketed tables there is no relationship.
+ */
currentBucket = setBucket(bucketInspector.get(
recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
// Initialize a deleteEventWriter if not yet done. (Lazy initialization)