You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/11 00:32:21 UTC
svn commit: r1586488 [2/2] - in /hive/trunk:
metastore/src/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hive/metastore/txn/
metastore/src/test/org/apache/hadoop/hive/metastore/txn/ ql/
ql/src/java/org/apache/hadoop/hive/q...
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.TException;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Stack;
+
+/**
+ * Super class for all of the compactor test modules.
+ */
+public abstract class CompactorTest {
+ static final private String CLASS_NAME = CompactorTest.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ protected CompactionTxnHandler txnHandler;
+ protected IMetaStoreClient ms;
+ protected long sleepTime = 1000;
+
+ private MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+ private File tmpdir;
+
+ protected CompactorTest() throws Exception {
+ HiveConf conf = new HiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.cleanDb();
+ ms = new HiveMetaStoreClient(conf);
+ txnHandler = new CompactionTxnHandler(conf);
+ tmpdir = new File(System.getProperty("java.io.tmpdir") +
+ System.getProperty("file.separator") + "compactor_test_tables");
+ tmpdir.mkdir();
+ tmpdir.deleteOnExit();
+ }
+
+ protected void startInitiator(HiveConf conf) throws Exception {
+ startThread('i', conf);
+ }
+
+ protected void startWorker(HiveConf conf) throws Exception {
+ startThread('w', conf);
+ }
+
+ protected void startCleaner(HiveConf conf) throws Exception {
+ startThread('c', conf);
+ }
+
+ protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
+ return newTable(dbName, tableName, partitioned, new HashMap<String, String>(), null);
+ }
+
+ protected Table newTable(String dbName, String tableName, boolean partitioned,
+ Map<String, String> parameters) throws TException {
+ return newTable(dbName, tableName, partitioned, parameters, null);
+
+ }
+
+ protected Table newTable(String dbName, String tableName, boolean partitioned,
+ Map<String, String> parameters, List<Order> sortCols)
+ throws TException {
+ Table table = new Table();
+ table.setTableName(tableName);
+ table.setDbName(dbName);
+ table.setOwner("me");
+ table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols));
+ List<FieldSchema> partKeys = new ArrayList<FieldSchema>(1);
+ if (partitioned) {
+ partKeys.add(new FieldSchema("ds", "string", "no comment"));
+ table.setPartitionKeys(partKeys);
+ }
+
+ table.setParameters(parameters);
+
+ ms.createTable(table);
+ return table;
+ }
+
+ protected Partition newPartition(Table t, String value) throws Exception {
+ return newPartition(t, value, null);
+ }
+
+ protected Partition newPartition(Table t, String value, List<Order> sortCols) throws Exception {
+ Partition part = new Partition();
+ part.addToValues(value);
+ part.setDbName(t.getDbName());
+ part.setTableName(t.getTableName());
+ part.setSd(newStorageDescriptor(getLocation(t.getTableName(), value), sortCols));
+ part.setParameters(new HashMap<String, String>());
+ ms.add_partition(part);
+ return part;
+ }
+
+ protected long openTxn() throws MetaException {
+ List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, System.getProperty("user.name"),
+ Worker.hostname())).getTxn_ids();
+ return txns.get(0);
+ }
+
+ protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ int numRecords) throws Exception{
+ addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
+ }
+
+ protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
+ int numRecords) throws Exception{
+ addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
+ }
+
+ protected void addLegacyFile(HiveConf conf, Table t, Partition p,
+ int numRecords) throws Exception {
+ addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
+ }
+
+ protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ int numRecords, int numBuckets, boolean allBucketsPresent)
+ throws Exception {
+ addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
+ }
+
+ protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
+ int numRecords, int numBuckets, boolean allBucketsPresent)
+ throws Exception {
+ addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
+ }
+
+ protected void addLegacyFile(HiveConf conf, Table t, Partition p,
+ int numRecords, int numBuckets, boolean allBucketsPresent)
+ throws Exception {
+ addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, numBuckets, allBucketsPresent);
+ }
+
+ protected List<Path> getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
+ String partValue = (p == null) ? null : p.getValues().get(0);
+ String location = getLocation(t.getTableName(), partValue);
+ Path dir = new Path(location);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stats = fs.listStatus(dir);
+ List<Path> paths = new ArrayList<Path>(stats.length);
+ for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath());
+ return paths;
+ }
+
+ protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnException, TxnAbortedException {
+ OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
+ for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid));
+ }
+
+ private StorageDescriptor newStorageDescriptor(String location, List<Order> sortCols) {
+ StorageDescriptor sd = new StorageDescriptor();
+ List<FieldSchema> cols = new ArrayList<FieldSchema>(2);
+ cols.add(new FieldSchema("a", "varchar(25)", "still no comment"));
+ cols.add(new FieldSchema("b", "int", "comment"));
+ sd.setCols(cols);
+ sd.setLocation(location);
+ sd.setInputFormat(MockInputFormat.class.getName());
+ sd.setOutputFormat(MockOutputFormat.class.getName());
+ sd.setNumBuckets(1);
+ SerDeInfo serde = new SerDeInfo();
+ serde.setSerializationLib(LazySimpleSerDe.class.getName());
+ sd.setSerdeInfo(serde);
+ List<String> bucketCols = new ArrayList<String>(1);
+ bucketCols.add("a");
+ sd.setBucketCols(bucketCols);
+
+ if (sortCols != null) {
+ sd.setSortCols(sortCols);
+ }
+ return sd;
+ }
+
+ // I can't do this with @Before because I want to be able to control the config file provided
+ // to each test.
+ private void startThread(char type, HiveConf conf) throws Exception {
+ TxnDbUtil.setConfValues(conf);
+ CompactorThread t = null;
+ switch (type) {
+ case 'i': t = new Initiator(); break;
+ case 'w': t = new Worker(); break;
+ case 'c': t = new Cleaner(); break;
+ default: throw new RuntimeException("Huh? Unknown thread type.");
+ }
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ stop.boolVal = true;
+ t.init(stop);
+ t.run();
+ }
+
+ private String getLocation(String tableName, String partValue) {
+ String location = tmpdir.getAbsolutePath() +
+ System.getProperty("file.separator") + tableName;
+ if (partValue != null) {
+ location += System.getProperty("file.separator") + "ds=" + partValue;
+ }
+ return location;
+ }
+
+ private enum FileType {BASE, DELTA, LEGACY};
+
+ private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+ int numRecords, FileType type, int numBuckets,
+ boolean allBucketsPresent) throws Exception {
+ String partValue = (p == null) ? null : p.getValues().get(0);
+ Path location = new Path(getLocation(t.getTableName(), partValue));
+ String filename = null;
+ switch (type) {
+ case BASE: filename = "base_" + maxTxn; break;
+ case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
+ case LEGACY: break; // handled below
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+ for (int bucket = 0; bucket < numBuckets; bucket++) {
+ if (bucket == 0 && !allBucketsPresent) continue; // skip one
+ Path partFile = null;
+ if (type == FileType.LEGACY) {
+ partFile = new Path(location, String.format(AcidUtils.BUCKET_DIGITS, bucket) + "_0");
+ } else {
+ Path dir = new Path(location, filename);
+ fs.mkdirs(dir);
+ partFile = AcidUtils.createBucketFile(dir, bucket);
+ }
+ FSDataOutputStream out = fs.create(partFile);
+ for (int i = 0; i < numRecords; i++) {
+ RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
+ ri.write(out);
+ out.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
+ out.close();
+ }
+ }
+
+ static class MockInputFormat implements AcidInputFormat<Text> {
+
+ @Override
+ public AcidInputFormat.RowReader<Text> getReader(InputSplit split,
+ Options options) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents, int bucket,
+ ValidTxnList validTxnList,
+ Path baseDirectory, Path... deltaDirectory) throws IOException {
+
+ List<Path> filesToRead = new ArrayList<Path>();
+ if (baseDirectory != null) {
+ if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ Path p = AcidUtils.createBucketFile(baseDirectory, bucket);
+ FileSystem fs = p.getFileSystem(conf);
+ if (fs.exists(p)) filesToRead.add(p);
+ } else {
+ filesToRead.add(new Path(baseDirectory, "00000_0"));
+
+ }
+ }
+ for (int i = 0; i < deltaDirectory.length; i++) {
+ Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket);
+ FileSystem fs = p.getFileSystem(conf);
+ if (fs.exists(p)) filesToRead.add(p);
+ }
+ return new MockRawReader(conf, filesToRead);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
+ return new InputSplit[0];
+ }
+
+ @Override
+ public RecordReader<NullWritable, Text> getRecordReader(InputSplit inputSplit, JobConf entries,
+ Reporter reporter) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+ IOException {
+ return false;
+ }
+ }
+
+ static class MockRawReader implements AcidInputFormat.RawReader<Text> {
+ private Stack<Path> filesToRead;
+ private Configuration conf;
+ private FSDataInputStream is = null;
+ private FileSystem fs;
+
+ MockRawReader(Configuration conf, List<Path> files) throws IOException {
+ filesToRead = new Stack<Path>();
+ for (Path file : files) filesToRead.push(file);
+ this.conf = conf;
+ fs = FileSystem.get(conf);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ return null;
+ }
+
+ @Override
+ public boolean next(RecordIdentifier identifier, Text text) throws IOException {
+ if (is == null) {
+ // Open the next file
+ if (filesToRead.empty()) return false;
+ Path p = filesToRead.pop();
+ LOG.debug("Reading records from " + p.toString());
+ is = fs.open(p);
+ }
+ String line = null;
+ try {
+ identifier.readFields(is);
+ line = is.readLine();
+ } catch (EOFException e) {
+ }
+ if (line == null) {
+ // Set our current entry to null (since it's done) and try again.
+ is = null;
+ return next(identifier, text);
+ }
+ text.set(line);
+ return true;
+ }
+
+ @Override
+ public RecordIdentifier createKey() {
+ return new RecordIdentifier();
+ }
+
+ @Override
+ public Text createValue() {
+ return new Text();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+ }
+
+ // This class isn't used and I suspect does totally the wrong thing. It's only here so that I
+ // can provide some output format to the tables and partitions I create. I actually write to
+ // those tables directory.
+ static class MockOutputFormat implements AcidOutputFormat<Text> {
+
+ @Override
+ public RecordUpdater getRecordUpdater(Path path, Options options) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public FSRecordWriter getRawRecordWriter(Path path, Options options) throws IOException {
+ return new MockRecordWriter(path, options);
+ }
+
+ @Override
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed, Properties tableProperties,
+ Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public RecordWriter<NullWritable, Text> getRecordWriter(FileSystem fileSystem, JobConf entries,
+ String s,
+ Progressable progressable) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException {
+
+ }
+ }
+
+ // This class isn't used and I suspect does totally the wrong thing. It's only here so that I
+ // can provide some output format to the tables and partitions I create. I actually write to
+ // those tables directory.
+ static class MockRecordWriter implements FSRecordWriter {
+ private FSDataOutputStream os;
+
+ MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException {
+ FileSystem fs = FileSystem.get(options.getConfiguration());
+ Path p = AcidUtils.createFilename(basedir, options);
+ os = fs.create(p);
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ Text t = (Text)w;
+ os.writeBytes(t.toString());
+ os.writeBytes("\n");
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ os.close();
+ }
+ }
+
+
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for the compactor Cleaner thread
+ */
+public class TestCleaner extends CompactorTest {
+ public TestCleaner() throws Exception {
+ super();
+ }
+
+ @Test
+ public void nothing() throws Exception {
+ // Test that the whole things works when there's nothing in the queue. This is just a
+ // survival test.
+ startCleaner(new HiveConf());
+ }
+
+ @Test
+ public void cleanupAfterMajorTableCompaction() throws Exception {
+ Table t = newTable("default", "camtc", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addBaseFile(conf, t, null, 25L, 25);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_25", paths.get(0).getName());
+ }
+
+ @Test
+ public void cleanupAfterMajorPartitionCompaction() throws Exception {
+ Table t = newTable("default", "campc", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(conf, t, p, 25L, 25);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List<Path> paths = getDirectories(conf, t, p);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_25", paths.get(0).getName());
+ }
+
+ @Test
+ public void cleanupAfterMinorTableCompaction() throws Exception {
+ Table t = newTable("default", "camitc", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(2, paths.size());
+ boolean sawBase = false, sawDelta = false;
+ for (Path p : paths) {
+ if (p.getName().equals("base_20")) sawBase = true;
+ else if (p.getName().equals("delta_21_24")) sawDelta = true;
+ else Assert.fail("Unexpected file " + p.getName());
+ }
+ Assert.assertTrue(sawBase);
+ Assert.assertTrue(sawDelta);
+ }
+
+ @Test
+ public void cleanupAfterMinorPartitionCompaction() throws Exception {
+ Table t = newTable("default", "camipc", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List<Path> paths = getDirectories(conf, t, p);
+ Assert.assertEquals(2, paths.size());
+ boolean sawBase = false, sawDelta = false;
+ for (Path path : paths) {
+ if (path.getName().equals("base_20")) sawBase = true;
+ else if (path.getName().equals("delta_21_24")) sawDelta = true;
+ else Assert.fail("Unexpected file " + path.getName());
+ }
+ Assert.assertTrue(sawBase);
+ Assert.assertTrue(sawDelta);
+ }
+
+ @Test
+ public void blockedByLockTable() throws Exception {
+ Table t = newTable("default", "bblt", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
+ comp.setTablename("bblt");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ Assert.assertEquals("bblt", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void blockedByLockPartition() throws Exception {
+ Table t = newTable("default", "bblp", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("bblp");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ Assert.assertEquals("bblp", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception {
+ Table t = newTable("default", "campcnb", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addDeltaFile(conf, t, p, 1L, 22L, 22);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addBaseFile(conf, t, p, 25L, 25);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "campcnb", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ startCleaner(conf);
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+
+ // Check that the files are removed
+ List<Path> paths = getDirectories(conf, t, p);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_25", paths.get(0).getName());
+ }
+
+ @Before
+ public void setUpTxnDb() throws Exception {
+ TxnDbUtil.setConfValues(new HiveConf());
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the compactor Initiator thread.
+ */
+public class TestInitiator extends CompactorTest {
+ static final private String CLASS_NAME = TestInitiator.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ public TestInitiator() throws Exception {
+ super();
+ }
+
+ @Test
+ public void nothing() throws Exception {
+ // Test that the whole things works when there's nothing in the queue. This is just a
+ // survival test.
+ startInitiator(new HiveConf());
+ }
+
+ @Test
+ public void recoverFailedLocalWorkers() throws Exception {
+ Table t = newTable("default", "rflw1", false);
+ CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ t = newTable("default", "rflw2", false);
+ rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ txnHandler.findNextToCompact(Worker.hostname() + "-193892");
+ txnHandler.findNextToCompact("nosuchhost-193892");
+
+ startInitiator(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(2, compacts.size());
+ boolean sawInitiated = false;
+ for (ShowCompactResponseElement c : compacts) {
+ if (c.getState().equals("working")) {
+ Assert.assertEquals("nosuchhost-193892", c.getWorkerid());
+ } else if (c.getState().equals("initiated")) {
+ sawInitiated = true;
+ } else {
+ Assert.fail("Unexpected state");
+ }
+ }
+ Assert.assertTrue(sawInitiated);
+ }
+
+ @Test
+ public void recoverFailedRemoteWorkers() throws Exception {
+ Table t = newTable("default", "rfrw1", false);
+ CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ txnHandler.findNextToCompact("nosuchhost-193892");
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L);
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ }
+
+ @Test
+ public void majorCompactOnTableTooManyAborts() throws Exception {
+ Table t = newTable("default", "mcottma", false);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("mcottma");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("mcottma", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void majorCompactOnPartitionTooManyAborts() throws Exception {
+ Table t = newTable("default", "mcoptma", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("mcoptma");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void noCompactOnManyDifferentPartitionAborts() throws Exception {
+ Table t = newTable("default", "ncomdpa", true);
+ for (int i = 0; i < 11; i++) {
+ Partition p = newPartition(t, "day-" + i);
+ }
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("ncomdpa");
+ comp.setPartitionname("ds=day-" + i);
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ }
+
+ @Test
+ public void cleanEmptyAbortedTxns() throws Exception {
+ // Test that we are cleaning aborted transactions with no components left in txn_components.
+ // Put one aborted transaction with an entry in txn_components to make sure we don't
+ // accidently clean it too.
+ Table t = newTable("default", "ceat", false);
+
+ HiveConf conf = new HiveConf();
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("ceat");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+ for (int i = 0; i < 100; i++) {
+ txnid = openTxn();
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(101, openTxns.getOpen_txnsSize());
+
+ startInitiator(conf);
+
+ openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(1, openTxns.getOpen_txnsSize());
+ }
+
+ @Test
+ public void noCompactWhenNoCompactSet() throws Exception {
+ Map<String, String> parameters = new HashMap<String, String>(1);
+ parameters.put("NO_AUTO_COMPACTION", "true");
+ Table t = newTable("default", "ncwncs", false, parameters);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("ncwncs");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ }
+
+ @Test
+ public void noCompactWhenCompactAlreadyScheduled() throws Exception {
+ Table t = newTable("default", "ncwcas", false);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+ for (int i = 0; i < 11; i++) {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("ncwcas");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ CompactionRequest rqst = new CompactionRequest("default", "ncwcas", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
+
+ startInitiator(conf);
+
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void compactTableHighDeltaPct() throws Exception {
+ Table t = newTable("default", "cthdp", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(23);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("cthdp");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cthdp", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void compactPartitionHighDeltaPct() throws Exception {
+ Table t = newTable("default", "cphdp", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(23);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+ comp.setTablename("cphdp");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cphdp", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void noCompactTableDeltaPctNotHighEnough() throws Exception {
+ Table t = newTable("default", "nctdpnhe", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 50L, 50);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(53);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("nctdpnhe");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ }
+
+ @Test
+ public void compactTableTooManyDeltas() throws Exception {
+ Table t = newTable("default", "cttmd", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 200L, 200);
+ addDeltaFile(conf, t, null, 201L, 201L, 1);
+ addDeltaFile(conf, t, null, 202L, 202L, 1);
+ addDeltaFile(conf, t, null, 203L, 203L, 1);
+ addDeltaFile(conf, t, null, 204L, 204L, 1);
+ addDeltaFile(conf, t, null, 205L, 205L, 1);
+ addDeltaFile(conf, t, null, 206L, 206L, 1);
+ addDeltaFile(conf, t, null, 207L, 207L, 1);
+ addDeltaFile(conf, t, null, 208L, 208L, 1);
+ addDeltaFile(conf, t, null, 209L, 209L, 1);
+ addDeltaFile(conf, t, null, 210L, 210L, 1);
+ addDeltaFile(conf, t, null, 211L, 211L, 1);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("cttmd");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cttmd", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void compactPartitionTooManyDeltas() throws Exception {
+ Table t = newTable("default", "cptmd", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 200L, 200);
+ addDeltaFile(conf, t, p, 201L, 201L, 1);
+ addDeltaFile(conf, t, p, 202L, 202L, 1);
+ addDeltaFile(conf, t, p, 203L, 203L, 1);
+ addDeltaFile(conf, t, p, 204L, 204L, 1);
+ addDeltaFile(conf, t, p, 205L, 205L, 1);
+ addDeltaFile(conf, t, p, 206L, 206L, 1);
+ addDeltaFile(conf, t, p, 207L, 207L, 1);
+ addDeltaFile(conf, t, p, 208L, 208L, 1);
+ addDeltaFile(conf, t, p, 209L, 209L, 1);
+ addDeltaFile(conf, t, p, 210L, 210L, 1);
+ addDeltaFile(conf, t, p, 211L, 211L, 1);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+ comp.setTablename("cptmd");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cptmd", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void noCompactTableNotEnoughDeltas() throws Exception {
+ Table t = newTable("default", "nctned", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 200L, 200);
+ addDeltaFile(conf, t, null, 201L, 205L, 5);
+ addDeltaFile(conf, t, null, 206L, 211L, 6);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("nctned");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertNull(rsp.getCompacts());
+ }
+
+ @Test
+ public void chooseMajorOverMinorWhenBothValid() throws Exception {
+ Table t = newTable("default", "cmomwbv", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 200L, 200);
+ addDeltaFile(conf, t, null, 201L, 211L, 11);
+ addDeltaFile(conf, t, null, 212L, 222L, 11);
+ addDeltaFile(conf, t, null, 223L, 233L, 11);
+ addDeltaFile(conf, t, null, 234L, 244L, 11);
+ addDeltaFile(conf, t, null, 245L, 255L, 11);
+ addDeltaFile(conf, t, null, 256L, 266L, 11);
+ addDeltaFile(conf, t, null, 267L, 277L, 11);
+ addDeltaFile(conf, t, null, 278L, 288L, 11);
+ addDeltaFile(conf, t, null, 289L, 299L, 11);
+ addDeltaFile(conf, t, null, 300L, 310L, 11);
+ addDeltaFile(conf, t, null, 311L, 321L, 11);
+
+ burnThroughTransactions(320);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setTablename("cmomwbv");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("cmomwbv", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void enoughDeltasNoBase() throws Exception {
+ Table t = newTable("default", "ednb", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addDeltaFile(conf, t, p, 1L, 201L, 200);
+ addDeltaFile(conf, t, p, 202L, 202L, 1);
+ addDeltaFile(conf, t, p, 203L, 203L, 1);
+ addDeltaFile(conf, t, p, 204L, 204L, 1);
+ addDeltaFile(conf, t, p, 205L, 205L, 1);
+ addDeltaFile(conf, t, p, 206L, 206L, 1);
+ addDeltaFile(conf, t, p, 207L, 207L, 1);
+ addDeltaFile(conf, t, p, 208L, 208L, 1);
+ addDeltaFile(conf, t, p, 209L, 209L, 1);
+ addDeltaFile(conf, t, p, 210L, 210L, 1);
+ addDeltaFile(conf, t, p, 211L, 211L, 1);
+
+ burnThroughTransactions(210);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+ comp.setTablename("ednb");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("ednb", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ @Test
+ public void twoTxnsOnSamePartitionGenerateOneCompactionRequest() throws Exception {
+ Table t = newTable("default", "ttospgocr", true);
+ Partition p = newPartition(t, "today");
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(23);
+
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+ comp.setTablename("ttospgocr");
+ comp.setPartitionname("ds=today");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ txnid = openTxn();
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+ comp.setTablename("ttospgocr");
+ comp.setPartitionname("ds=today");
+ components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ res = txnHandler.lock(req);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("ttospgocr", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
+
+ // TODO test compactions with legacy file types
+
+ @Before
+ public void setUpTxnDb() throws Exception {
+ TxnDbUtil.setConfValues(new HiveConf());
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the worker thread and its MR jobs.
+ */
+public class TestWorker extends CompactorTest {
+ static final private String CLASS_NAME = TestWorker.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ public TestWorker() throws Exception {
+ super();
+ }
+
+ @Test
+ public void nothing() throws Exception {
+ // Test that the whole things works when there's nothing in the queue. This is just a
+ // survival test.
+ startWorker(new HiveConf());
+ }
+
+ @Test
+ public void stringableMap() throws Exception {
+ // Empty map case
+ CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap<String, String>());
+ String s = m.toString();
+ Assert.assertEquals("0:", s);
+ m = new CompactorMR.StringableMap(s);
+ Assert.assertEquals(0, m.size());
+
+ Map<String, String> base = new HashMap<String, String>();
+ base.put("mary", "poppins");
+ base.put("bert", null);
+ base.put(null, "banks");
+ m = new CompactorMR.StringableMap(base);
+ s = m.toString();
+ m = new CompactorMR.StringableMap(s);
+ Assert.assertEquals(3, m.size());
+ Map<String, Boolean> saw = new HashMap<String, Boolean>(3);
+ saw.put("mary", false);
+ saw.put("bert", false);
+ saw.put(null, false);
+ for (Map.Entry<String, String> e : m.entrySet()) {
+ saw.put(e.getKey(), true);
+ if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue());
+ else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue());
+ else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue());
+ else Assert.fail("Unexpected value " + e.getKey());
+ }
+ Assert.assertEquals(3, saw.size());
+ Assert.assertTrue(saw.get("mary"));
+ Assert.assertTrue(saw.get("bert"));
+ Assert.assertTrue(saw.get(null));
+ }
+
+ @Test
+ public void stringableList() throws Exception {
+ // Empty list case
+ CompactorMR.StringableList ls = new CompactorMR.StringableList();
+ String s = ls.toString();
+ Assert.assertEquals("0:", s);
+ ls = new CompactorMR.StringableList(s);
+ Assert.assertEquals(0, ls.size());
+
+ ls = new CompactorMR.StringableList();
+ ls.add(new Path("/tmp"));
+ ls.add(new Path("/usr"));
+ s = ls.toString();
+ Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
+ "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s));
+ ls = new CompactorMR.StringableList(s);
+ Assert.assertEquals(2, ls.size());
+ boolean sawTmp = false, sawUsr = false;
+ for (Path p : ls) {
+ if ("/tmp".equals(p.toString())) sawTmp = true;
+ else if ("/usr".equals(p.toString())) sawUsr = true;
+ else Assert.fail("Unexpected path " + p.toString());
+ }
+ Assert.assertTrue(sawTmp);
+ Assert.assertTrue(sawUsr);
+ }
+
+ @Test
+ public void inputSplit() throws Exception {
+ String basename = "/warehouse/foo/base_1";
+ String delta1 = "/warehouse/foo/delta_2_3";
+ String delta2 = "/warehouse/foo/delta_4_7";
+
+ HiveConf conf = new HiveConf();
+ Path file = new Path(System.getProperty("java.io.tmpdir") +
+ System.getProperty("file.separator") + "newWriteInputSplitTest");
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream os = fs.create(file);
+ for (int i = 0; i < 10; i++) {
+ os.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
+ os.close();
+ List<Path> files = new ArrayList<Path>(1);
+ files.add(file);
+
+ Path[] deltas = new Path[2];
+ deltas[0] = new Path(delta1);
+ deltas[1] = new Path(delta2);
+
+ CompactorMR.CompactorInputSplit split =
+ new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas);
+
+ Assert.assertEquals(520L, split.getLength());
+ String[] locations = split.getLocations();
+ Assert.assertEquals(1, locations.length);
+ Assert.assertEquals("localhost", locations[0]);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(buf);
+ split.write(out);
+
+ split = new CompactorMR.CompactorInputSplit();
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+ split.readFields(in);
+
+ Assert.assertEquals(3, split.getBucket());
+ Assert.assertEquals(basename, split.getBaseDir().toString());
+ deltas = split.getDeltaDirs();
+ Assert.assertEquals(2, deltas.length);
+ Assert.assertEquals(delta1, deltas[0].toString());
+ Assert.assertEquals(delta2, deltas[1].toString());
+ }
+
+ @Test
+ public void inputSplitNullBase() throws Exception {
+ String delta1 = "/warehouse/foo/delta_2_3";
+ String delta2 = "/warehouse/foo/delta_4_7";
+
+ HiveConf conf = new HiveConf();
+ Path file = new Path(System.getProperty("java.io.tmpdir") +
+ System.getProperty("file.separator") + "newWriteInputSplitTest");
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream os = fs.create(file);
+ for (int i = 0; i < 10; i++) {
+ os.writeBytes("mary had a little lamb its fleece was white as snow\n");
+ }
+ os.close();
+ List<Path> files = new ArrayList<Path>(1);
+ files.add(file);
+
+ Path[] deltas = new Path[2];
+ deltas[0] = new Path(delta1);
+ deltas[1] = new Path(delta2);
+
+ CompactorMR.CompactorInputSplit split =
+ new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(buf);
+ split.write(out);
+
+ split = new CompactorMR.CompactorInputSplit();
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+ split.readFields(in);
+
+ Assert.assertEquals(3, split.getBucket());
+ Assert.assertNull(split.getBaseDir());
+ deltas = split.getDeltaDirs();
+ Assert.assertEquals(2, deltas.length);
+ Assert.assertEquals(delta1, deltas[0].toString());
+ Assert.assertEquals(delta2, deltas[1].toString());
+ }
+
+ @Test
+ public void sortedTable() throws Exception {
+ List<Order> sortCols = new ArrayList<Order>(1);
+ sortCols.add(new Order("b", 1));
+
+ Table t = newTable("default", "st", false, new HashMap<String, String>(), sortCols);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+ addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+ }
+
+ @Test
+ public void sortedPartition() throws Exception {
+ List<Order> sortCols = new ArrayList<Order>(1);
+ sortCols.add(new Order("b", 1));
+
+ Table t = newTable("default", "sp", true, new HashMap<String, String>(), sortCols);
+ Partition p = newPartition(t, "today", sortCols);
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+ addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "sp", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+ }
+
+ @Test
+ public void minorTableWithBase() throws Exception {
+ LOG.debug("Starting minorTableWithBase");
+ Table t = newTable("default", "mtwb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(conf);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString());
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(208L, buckets[0].getLen());
+ Assert.assertEquals(208L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void minorPartitionWithBase() throws Exception {
+ Table t = newTable("default", "mpwb", true);
+ Partition p = newPartition(t, "today");
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mpwb", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(208L, buckets[0].getLen());
+ Assert.assertEquals(208L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void minorTableNoBase() throws Exception {
+ LOG.debug("Starting minorTableWithBase");
+ Table t = newTable("default", "mtnb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addDeltaFile(conf, t, null, 1L, 2L, 2);
+ addDeltaFile(conf, t, null, 3L, 4L, 2);
+
+ burnThroughTransactions(5);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(3, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000001_0000004")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(208L, buckets[0].getLen());
+ Assert.assertEquals(208L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void majorTableWithBase() throws Exception {
+ LOG.debug("Starting majorTableWithBase");
+ Table t = newTable("default", "matwb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, null, 20L, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(1248L, buckets[0].getLen());
+ Assert.assertEquals(1248L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void majorPartitionWithBase() throws Exception {
+ LOG.debug("Starting majorPartitionWithBase");
+ Table t = newTable("default", "mapwb", true);
+ Partition p = newPartition(t, "today");
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20);
+ addDeltaFile(conf, t, p, 21L, 22L, 2);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(1248L, buckets[0].getLen());
+ Assert.assertEquals(1248L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void majorTableNoBase() throws Exception {
+ LOG.debug("Starting majorTableNoBase");
+ Table t = newTable("default", "matnb", false);
+
+ HiveConf conf = new HiveConf();
+
+ addDeltaFile(conf, t, null, 1L, 2L, 2);
+ addDeltaFile(conf, t, null, 3L, 4L, 2);
+
+ burnThroughTransactions(5);
+
+ CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(3, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000004")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(208L, buckets[0].getLen());
+ Assert.assertEquals(208L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void majorTableLegacy() throws Exception {
+ LOG.debug("Starting majorTableLegacy");
+ Table t = newTable("default", "matl", false);
+
+ HiveConf conf = new HiveConf();
+
+ addLegacyFile(conf, t, null, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ //Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(1248L, buckets[0].getLen());
+ Assert.assertEquals(1248L, buckets[1].getLen());
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Test
+ public void minorTableLegacy() throws Exception {
+ LOG.debug("Starting minorTableLegacy");
+ Table t = newTable("default", "mtl", false);
+
+ HiveConf conf = new HiveConf();
+
+ addLegacyFile(conf, t, null, 20);
+ addDeltaFile(conf, t, null, 21L, 22L, 2);
+ addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still now be 5 directories in the location
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewDelta = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ }
+
+ @Test
+ public void majorPartitionWithBaseMissingBuckets() throws Exception {
+ Table t = newTable("default", "mapwbmb", true);
+ Partition p = newPartition(t, "today");
+ HiveConf conf = new HiveConf();
+
+ addBaseFile(conf, t, p, 20L, 20, 2, false);
+ addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false);
+ addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+ burnThroughTransactions(25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "mapwbmb", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ startWorker(new HiveConf());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+ // There should still be four directories in the location.
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+ Assert.assertEquals(4, stat.length);
+
+ // Find the new delta file and make sure it has the right contents
+ boolean sawNewBase = false;
+ for (int i = 0; i < stat.length; i++) {
+ if (stat[i].getPath().getName().equals("base_0000024")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ // Bucket 0 should be small and bucket 1 should be large, make sure that's the case
+ Assert.assertTrue(
+ ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen()
+ && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L == buckets[1] .getLen())
+ ||
+ ("bucket_00000".equals(buckets[1].getPath().getName()) && 104L == buckets[1].getLen()
+ && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L == buckets[0] .getLen())
+ );
+ } else {
+ LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ }
+
+ @Before
+ public void setUpTxnDb() throws Exception {
+ TxnDbUtil.setConfValues(new HiveConf());
+ }
+}