You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/11 00:32:21 UTC

svn commit: r1586488 [2/2] - in /hive/trunk: metastore/src/java/org/apache/hadoop/hive/metastore/ metastore/src/java/org/apache/hadoop/hive/metastore/txn/ metastore/src/test/org/apache/hadoop/hive/metastore/txn/ ql/ ql/src/java/org/apache/hadoop/hive/q...

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.TException;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Stack;
+
+/**
+ * Super class for all of the compactor test modules.
+ */
+public abstract class CompactorTest {
+  static final private String CLASS_NAME = CompactorTest.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected CompactionTxnHandler txnHandler;
+  protected IMetaStoreClient ms;
+  protected long sleepTime = 1000;
+
+  private MetaStoreThread.BooleanPointer stop = new MetaStoreThread.BooleanPointer();
+  private File tmpdir;
+
+  protected CompactorTest() throws Exception {
+    HiveConf conf = new HiveConf();
+    TxnDbUtil.setConfValues(conf);
+    TxnDbUtil.cleanDb();
+    ms = new HiveMetaStoreClient(conf);
+    txnHandler = new CompactionTxnHandler(conf);
+    tmpdir = new File(System.getProperty("java.io.tmpdir") +
+        System.getProperty("file.separator") + "compactor_test_tables");
+    tmpdir.mkdir();
+    tmpdir.deleteOnExit();
+  }
+
+  protected void startInitiator(HiveConf conf) throws Exception {
+    startThread('i', conf);
+  }
+
+  protected void startWorker(HiveConf conf) throws Exception {
+    startThread('w', conf);
+  }
+
+  protected void startCleaner(HiveConf conf) throws Exception {
+    startThread('c', conf);
+  }
+
+  protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
+    return newTable(dbName, tableName, partitioned, new HashMap<String, String>(), null);
+  }
+
+  protected Table newTable(String dbName, String tableName, boolean partitioned,
+                           Map<String, String> parameters)  throws TException {
+    return newTable(dbName, tableName, partitioned, parameters, null);
+
+  }
+
+  protected Table newTable(String dbName, String tableName, boolean partitioned,
+                           Map<String, String> parameters, List<Order> sortCols)
+      throws  TException {
+    Table table = new Table();
+    table.setTableName(tableName);
+    table.setDbName(dbName);
+    table.setOwner("me");
+    table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols));
+    List<FieldSchema> partKeys = new ArrayList<FieldSchema>(1);
+    if (partitioned) {
+      partKeys.add(new FieldSchema("ds", "string", "no comment"));
+      table.setPartitionKeys(partKeys);
+    }
+
+    table.setParameters(parameters);
+
+    ms.createTable(table);
+    return table;
+  }
+
+  protected Partition newPartition(Table t, String value) throws Exception {
+    return newPartition(t, value, null);
+  }
+
+  protected Partition newPartition(Table t, String value, List<Order> sortCols) throws Exception {
+    Partition part = new Partition();
+    part.addToValues(value);
+    part.setDbName(t.getDbName());
+    part.setTableName(t.getTableName());
+    part.setSd(newStorageDescriptor(getLocation(t.getTableName(), value), sortCols));
+    part.setParameters(new HashMap<String, String>());
+    ms.add_partition(part);
+    return part;
+  }
+
+  protected long openTxn() throws MetaException {
+    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, System.getProperty("user.name"),
+        Worker.hostname())).getTxn_ids();
+    return txns.get(0);
+  }
+
+  protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+                              int numRecords) throws Exception{
+    addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
+  }
+
+  protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
+                             int numRecords) throws Exception{
+    addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
+  }
+
+  protected void addLegacyFile(HiveConf conf, Table t, Partition p,
+                               int numRecords) throws Exception {
+    addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
+  }
+
+  protected void addDeltaFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+                              int numRecords, int numBuckets, boolean allBucketsPresent)
+      throws Exception {
+    addFile(conf, t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
+  }
+
+  protected void addBaseFile(HiveConf conf, Table t, Partition p, long maxTxn,
+                             int numRecords, int numBuckets, boolean allBucketsPresent)
+      throws Exception {
+    addFile(conf, t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
+  }
+
+  protected void addLegacyFile(HiveConf conf, Table t, Partition p,
+                               int numRecords, int numBuckets, boolean allBucketsPresent)
+      throws Exception {
+    addFile(conf, t, p, 0, 0, numRecords, FileType.LEGACY, numBuckets, allBucketsPresent);
+  }
+
+  protected List<Path> getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
+    String partValue = (p == null) ? null : p.getValues().get(0);
+    String location = getLocation(t.getTableName(), partValue);
+    Path dir = new Path(location);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stats = fs.listStatus(dir);
+    List<Path> paths = new ArrayList<Path>(stats.length);
+    for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath());
+    return paths;
+  }
+
+  protected void burnThroughTransactions(int num) throws MetaException, NoSuchTxnException, TxnAbortedException {
+    OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
+    for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new CommitTxnRequest(tid));
+  }
+
+  private StorageDescriptor newStorageDescriptor(String location, List<Order> sortCols) {
+    StorageDescriptor sd = new StorageDescriptor();
+    List<FieldSchema> cols = new ArrayList<FieldSchema>(2);
+    cols.add(new FieldSchema("a", "varchar(25)", "still no comment"));
+    cols.add(new FieldSchema("b", "int", "comment"));
+    sd.setCols(cols);
+    sd.setLocation(location);
+    sd.setInputFormat(MockInputFormat.class.getName());
+    sd.setOutputFormat(MockOutputFormat.class.getName());
+    sd.setNumBuckets(1);
+    SerDeInfo serde = new SerDeInfo();
+    serde.setSerializationLib(LazySimpleSerDe.class.getName());
+    sd.setSerdeInfo(serde);
+    List<String> bucketCols = new ArrayList<String>(1);
+    bucketCols.add("a");
+    sd.setBucketCols(bucketCols);
+
+    if (sortCols != null) {
+      sd.setSortCols(sortCols);
+    }
+    return sd;
+  }
+
+  // I can't do this with @Before because I want to be able to control the config file provided
+  // to each test.
+  private void startThread(char type, HiveConf conf) throws Exception {
+    TxnDbUtil.setConfValues(conf);
+    CompactorThread t = null;
+    switch (type) {
+      case 'i': t = new Initiator(); break;
+      case 'w': t = new Worker(); break;
+      case 'c': t = new Cleaner(); break;
+      default: throw new RuntimeException("Huh? Unknown thread type.");
+    }
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(conf);
+    stop.boolVal = true;
+    t.init(stop);
+    t.run();
+  }
+
+  private String getLocation(String tableName, String partValue) {
+    String location =  tmpdir.getAbsolutePath() +
+      System.getProperty("file.separator") + tableName;
+    if (partValue != null) {
+      location += System.getProperty("file.separator") + "ds=" + partValue;
+    }
+    return location;
+  }
+
+  private enum FileType {BASE, DELTA, LEGACY};
+
+  private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxTxn,
+                       int numRecords,  FileType type, int numBuckets,
+                       boolean allBucketsPresent) throws Exception {
+    String partValue = (p == null) ? null : p.getValues().get(0);
+    Path location = new Path(getLocation(t.getTableName(), partValue));
+    String filename = null;
+    switch (type) {
+      case BASE: filename = "base_" + maxTxn; break;
+      case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
+      case LEGACY: break; // handled below
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    for (int bucket = 0; bucket < numBuckets; bucket++) {
+      if (bucket == 0 && !allBucketsPresent) continue; // skip one
+      Path partFile = null;
+      if (type == FileType.LEGACY) {
+        partFile = new Path(location, String.format(AcidUtils.BUCKET_DIGITS, bucket) + "_0");
+      } else {
+        Path dir = new Path(location, filename);
+        fs.mkdirs(dir);
+        partFile = AcidUtils.createBucketFile(dir, bucket);
+      }
+      FSDataOutputStream out = fs.create(partFile);
+      for (int i = 0; i < numRecords; i++) {
+        RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
+        ri.write(out);
+        out.writeBytes("mary had a little lamb its fleece was white as snow\n");
+      }
+      out.close();
+    }
+  }
+
+  static class MockInputFormat implements AcidInputFormat<Text> {
+
+    @Override
+    public AcidInputFormat.RowReader<Text> getReader(InputSplit split,
+                                                          Options options) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents, int bucket,
+                                        ValidTxnList validTxnList,
+                                        Path baseDirectory, Path... deltaDirectory) throws IOException {
+
+      List<Path> filesToRead = new ArrayList<Path>();
+      if (baseDirectory != null) {
+        if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+          Path p = AcidUtils.createBucketFile(baseDirectory, bucket);
+          FileSystem fs = p.getFileSystem(conf);
+          if (fs.exists(p)) filesToRead.add(p);
+        } else {
+          filesToRead.add(new Path(baseDirectory, "00000_0"));
+
+        }
+      }
+      for (int i = 0; i < deltaDirectory.length; i++) {
+        Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket);
+        FileSystem fs = p.getFileSystem(conf);
+        if (fs.exists(p)) filesToRead.add(p);
+      }
+      return new MockRawReader(conf, filesToRead);
+    }
+
+    @Override
+    public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
+      return new InputSplit[0];
+    }
+
+    @Override
+    public RecordReader<NullWritable, Text> getRecordReader(InputSplit inputSplit, JobConf entries,
+                                                            Reporter reporter) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+        IOException {
+      return false;
+    }
+  }
+
+  static class MockRawReader implements AcidInputFormat.RawReader<Text> {
+    private Stack<Path> filesToRead;
+    private Configuration conf;
+    private FSDataInputStream is = null;
+    private FileSystem fs;
+
+    MockRawReader(Configuration conf, List<Path> files) throws IOException {
+      filesToRead = new Stack<Path>();
+      for (Path file : files) filesToRead.push(file);
+      this.conf = conf;
+      fs = FileSystem.get(conf);
+    }
+
+    @Override
+    public ObjectInspector getObjectInspector() {
+      return null;
+    }
+
+    @Override
+    public boolean next(RecordIdentifier identifier, Text text) throws IOException {
+      if (is == null) {
+        // Open the next file
+        if (filesToRead.empty()) return false;
+        Path p = filesToRead.pop();
+        LOG.debug("Reading records from " + p.toString());
+        is = fs.open(p);
+      }
+      String line = null;
+      try {
+        identifier.readFields(is);
+        line = is.readLine();
+      } catch (EOFException e) {
+      }
+      if (line == null) {
+        // Set our current entry to null (since it's done) and try again.
+        is = null;
+        return next(identifier, text);
+      }
+      text.set(line);
+      return true;
+    }
+
+    @Override
+    public RecordIdentifier createKey() {
+      return new RecordIdentifier();
+    }
+
+    @Override
+    public Text createValue() {
+      return new Text();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return 0;
+    }
+  }
+
+  // This class isn't used and I suspect does totally the wrong thing.  It's only here so that I
+  // can provide some output format to the tables and partitions I create.  I actually write to
+  // those tables directory.
+  static class MockOutputFormat implements AcidOutputFormat<Text> {
+
+    @Override
+    public RecordUpdater getRecordUpdater(Path path, Options options) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public FSRecordWriter getRawRecordWriter(Path path, Options options) throws IOException {
+      return new MockRecordWriter(path, options);
+    }
+
+    @Override
+    public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+                                              Class<? extends Writable> valueClass,
+                                              boolean isCompressed, Properties tableProperties,
+                                              Progressable progress) throws IOException {
+      return null;
+    }
+
+    @Override
+    public RecordWriter<NullWritable, Text> getRecordWriter(FileSystem fileSystem, JobConf entries,
+                                                            String s,
+                                                            Progressable progressable) throws
+        IOException {
+      return null;
+    }
+
+    @Override
+    public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException {
+
+    }
+  }
+
+  // This class isn't used and I suspect does totally the wrong thing.  It's only here so that I
+  // can provide some output format to the tables and partitions I create.  I actually write to
+  // those tables directory.
+  static class MockRecordWriter implements FSRecordWriter {
+    private FSDataOutputStream os;
+
+    MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException {
+      FileSystem fs = FileSystem.get(options.getConfiguration());
+      Path p = AcidUtils.createFilename(basedir, options);
+      os = fs.create(p);
+    }
+
+    @Override
+    public void write(Writable w) throws IOException {
+      Text t = (Text)w;
+      os.writeBytes(t.toString());
+      os.writeBytes("\n");
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      os.close();
+    }
+  }
+
+
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for the compactor Cleaner thread
+ */
+public class TestCleaner extends CompactorTest {
+  public TestCleaner() throws Exception {
+    super();
+  }
+
+  @Test
+  public void nothing() throws Exception {
+    // Test that the whole things works when there's nothing in the queue.  This is just a
+    // survival test.
+    startCleaner(new HiveConf());
+  }
+
+  @Test
+  public void cleanupAfterMajorTableCompaction() throws Exception {
+    Table t = newTable("default", "camtc", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+    addBaseFile(conf, t, null, 25L, 25);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_25", paths.get(0).getName());
+  }
+
+  @Test
+  public void cleanupAfterMajorPartitionCompaction() throws Exception {
+    Table t = newTable("default", "campc", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+    addBaseFile(conf, t, p, 25L, 25);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, p);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_25", paths.get(0).getName());
+  }
+
+  @Test
+  public void cleanupAfterMinorTableCompaction() throws Exception {
+    Table t = newTable("default", "camitc", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+    addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(2, paths.size());
+    boolean sawBase = false, sawDelta = false;
+    for (Path p : paths) {
+      if (p.getName().equals("base_20")) sawBase = true;
+      else if (p.getName().equals("delta_21_24")) sawDelta = true;
+      else Assert.fail("Unexpected file " + p.getName());
+    }
+    Assert.assertTrue(sawBase);
+    Assert.assertTrue(sawDelta);
+  }
+
+  @Test
+  public void cleanupAfterMinorPartitionCompaction() throws Exception {
+    Table t = newTable("default", "camipc", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+    addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, p);
+    Assert.assertEquals(2, paths.size());
+    boolean sawBase = false, sawDelta = false;
+    for (Path path : paths) {
+      if (path.getName().equals("base_20")) sawBase = true;
+      else if (path.getName().equals("delta_21_24")) sawDelta = true;
+      else Assert.fail("Unexpected file " + path.getName());
+    }
+    Assert.assertTrue(sawBase);
+    Assert.assertTrue(sawDelta);
+  }
+
+  @Test
+  public void blockedByLockTable() throws Exception {
+    Table t = newTable("default", "bblt", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+    addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default");
+    comp.setTablename("bblt");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    Assert.assertEquals("bblt", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void blockedByLockPartition() throws Exception {
+    Table t = newTable("default", "bblp", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+    addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("bblp");
+    comp.setPartitionname("ds=today");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    Assert.assertEquals("bblp", compacts.get(0).getTablename());
+    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+    Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception {
+    Table t = newTable("default", "campcnb", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addDeltaFile(conf, t, p, 1L, 22L, 22);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+    addBaseFile(conf, t, p, 25L, 25);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "campcnb", CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markCompacted(ci);
+    txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+    startCleaner(conf);
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, p);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_25", paths.get(0).getName());
+  }
+
+  @Before
+  public void setUpTxnDb() throws Exception {
+    TxnDbUtil.setConfValues(new HiveConf());
+  }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the compactor Initiator thread.
+ */
+public class TestInitiator extends CompactorTest {
+  static final private String CLASS_NAME = TestInitiator.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  public TestInitiator() throws Exception {
+    super();
+  }
+
+  @Test
+  public void nothing() throws Exception {
+    // Test that the whole things works when there's nothing in the queue.  This is just a
+    // survival test.
+    startInitiator(new HiveConf());
+  }
+
+  @Test
+  public void recoverFailedLocalWorkers() throws Exception {
+    Table t = newTable("default", "rflw1", false);
+    CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    t = newTable("default", "rflw2", false);
+    rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    txnHandler.findNextToCompact(Worker.hostname() + "-193892");
+    txnHandler.findNextToCompact("nosuchhost-193892");
+
+    startInitiator(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(2, compacts.size());
+    boolean sawInitiated = false;
+    for (ShowCompactResponseElement c : compacts) {
+      if (c.getState().equals("working")) {
+        Assert.assertEquals("nosuchhost-193892", c.getWorkerid());
+      } else if (c.getState().equals("initiated")) {
+        sawInitiated = true;
+      } else {
+        Assert.fail("Unexpected state");
+      }
+    }
+    Assert.assertTrue(sawInitiated);
+  }
+
+  @Test
+  public void recoverFailedRemoteWorkers() throws Exception {
+    Table t = newTable("default", "rfrw1", false);
+    CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    txnHandler.findNextToCompact("nosuchhost-193892");
+
+    HiveConf conf = new HiveConf();
+    HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L);
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+  }
+
+  @Test
+  public void majorCompactOnTableTooManyAborts() throws Exception {
+    Table t = newTable("default", "mcottma", false);
+
+    HiveConf conf = new HiveConf();
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+    for (int i = 0; i < 11; i++) {
+      long txnid = openTxn();
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,  "default");
+      comp.setTablename("mcottma");
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      req.setTxnid(txnid);
+      LockResponse res = txnHandler.lock(req);
+      txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    }
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("mcottma", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void majorCompactOnPartitionTooManyAborts() throws Exception {
+    Table t = newTable("default", "mcoptma", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+    for (int i = 0; i < 11; i++) {
+      long txnid = openTxn();
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+      comp.setTablename("mcoptma");
+      comp.setPartitionname("ds=today");
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      req.setTxnid(txnid);
+      LockResponse res = txnHandler.lock(req);
+      txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    }
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
+    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void noCompactOnManyDifferentPartitionAborts() throws Exception {
+    Table t = newTable("default", "ncomdpa", true);
+    for (int i = 0; i < 11; i++) {
+      Partition p = newPartition(t, "day-" + i);
+    }
+
+    HiveConf conf = new HiveConf();
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+    for (int i = 0; i < 11; i++) {
+      long txnid = openTxn();
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,  "default");
+      comp.setTablename("ncomdpa");
+      comp.setPartitionname("ds=day-" + i);
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      req.setTxnid(txnid);
+      LockResponse res = txnHandler.lock(req);
+      txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    }
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+  }
+
+  @Test
+  public void cleanEmptyAbortedTxns() throws Exception {
+    // Test that we are cleaning aborted transactions with no components left in txn_components.
+    // Put one aborted transaction with an entry in txn_components to make sure we don't
+    // accidently clean it too.
+    Table t = newTable("default", "ceat", false);
+
+    HiveConf conf = new HiveConf();
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("ceat");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+    for (int i = 0; i < 100; i++) {
+      txnid = openTxn();
+      txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    }
+    GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+    Assert.assertEquals(101, openTxns.getOpen_txnsSize());
+
+    startInitiator(conf);
+
+    openTxns = txnHandler.getOpenTxns();
+    Assert.assertEquals(1, openTxns.getOpen_txnsSize());
+  }
+
+  @Test
+  public void noCompactWhenNoCompactSet() throws Exception {
+    Map<String, String> parameters = new HashMap<String, String>(1);
+    parameters.put("NO_AUTO_COMPACTION", "true");
+    Table t = newTable("default", "ncwncs", false, parameters);
+
+    HiveConf conf = new HiveConf();
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+    for (int i = 0; i < 11; i++) {
+      long txnid = openTxn();
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+      comp.setTablename("ncwncs");
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      req.setTxnid(txnid);
+      LockResponse res = txnHandler.lock(req);
+      txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    }
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+  }
+
+  @Test
+  public void noCompactWhenCompactAlreadyScheduled() throws Exception {
+    Table t = newTable("default", "ncwcas", false);
+
+    HiveConf conf = new HiveConf();
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
+
+    for (int i = 0; i < 11; i++) {
+      long txnid = openTxn();
+      LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,  "default");
+      comp.setTablename("ncwcas");
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      req.setTxnid(txnid);
+      LockResponse res = txnHandler.lock(req);
+      txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    }
+
+    CompactionRequest rqst = new CompactionRequest("default", "ncwcas", CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
+
+    startInitiator(conf);
+
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("ncwcas", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void compactTableHighDeltaPct() throws Exception {
+    Table t = newTable("default", "cthdp", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+    burnThroughTransactions(23);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("cthdp");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("cthdp", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void compactPartitionHighDeltaPct() throws Exception {
+    Table t = newTable("default", "cphdp", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+    burnThroughTransactions(23);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+    comp.setTablename("cphdp");
+    comp.setPartitionname("ds=today");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("cphdp", compacts.get(0).getTablename());
+    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void noCompactTableDeltaPctNotHighEnough() throws Exception {
+    Table t = newTable("default", "nctdpnhe", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 50L, 50);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+    burnThroughTransactions(53);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("nctdpnhe");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+  }
+
+  @Test
+  public void compactTableTooManyDeltas() throws Exception {
+    Table t = newTable("default", "cttmd", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 200L, 200);
+    addDeltaFile(conf, t, null, 201L, 201L, 1);
+    addDeltaFile(conf, t, null, 202L, 202L, 1);
+    addDeltaFile(conf, t, null, 203L, 203L, 1);
+    addDeltaFile(conf, t, null, 204L, 204L, 1);
+    addDeltaFile(conf, t, null, 205L, 205L, 1);
+    addDeltaFile(conf, t, null, 206L, 206L, 1);
+    addDeltaFile(conf, t, null, 207L, 207L, 1);
+    addDeltaFile(conf, t, null, 208L, 208L, 1);
+    addDeltaFile(conf, t, null, 209L, 209L, 1);
+    addDeltaFile(conf, t, null, 210L, 210L, 1);
+    addDeltaFile(conf, t, null, 211L, 211L, 1);
+
+    burnThroughTransactions(210);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("cttmd");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("cttmd", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void compactPartitionTooManyDeltas() throws Exception {
+    Table t = newTable("default", "cptmd", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 200L, 200);
+    addDeltaFile(conf, t, p, 201L, 201L, 1);
+    addDeltaFile(conf, t, p, 202L, 202L, 1);
+    addDeltaFile(conf, t, p, 203L, 203L, 1);
+    addDeltaFile(conf, t, p, 204L, 204L, 1);
+    addDeltaFile(conf, t, p, 205L, 205L, 1);
+    addDeltaFile(conf, t, p, 206L, 206L, 1);
+    addDeltaFile(conf, t, p, 207L, 207L, 1);
+    addDeltaFile(conf, t, p, 208L, 208L, 1);
+    addDeltaFile(conf, t, p, 209L, 209L, 1);
+    addDeltaFile(conf, t, p, 210L, 210L, 1);
+    addDeltaFile(conf, t, p, 211L, 211L, 1);
+
+    burnThroughTransactions(210);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+    comp.setTablename("cptmd");
+    comp.setPartitionname("ds=today");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("cptmd", compacts.get(0).getTablename());
+    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+    Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void noCompactTableNotEnoughDeltas() throws Exception {
+    Table t = newTable("default", "nctned", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 200L, 200);
+    addDeltaFile(conf, t, null, 201L, 205L, 5);
+    addDeltaFile(conf, t, null, 206L, 211L, 6);
+
+    burnThroughTransactions(210);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("nctned");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertNull(rsp.getCompacts());
+  }
+
+  @Test
+  public void chooseMajorOverMinorWhenBothValid() throws Exception {
+    Table t = newTable("default", "cmomwbv", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 200L, 200);
+    addDeltaFile(conf, t, null, 201L, 211L, 11);
+    addDeltaFile(conf, t, null, 212L, 222L, 11);
+    addDeltaFile(conf, t, null, 223L, 233L, 11);
+    addDeltaFile(conf, t, null, 234L, 244L, 11);
+    addDeltaFile(conf, t, null, 245L, 255L, 11);
+    addDeltaFile(conf, t, null, 256L, 266L, 11);
+    addDeltaFile(conf, t, null, 267L, 277L, 11);
+    addDeltaFile(conf, t, null, 278L, 288L, 11);
+    addDeltaFile(conf, t, null, 289L, 299L, 11);
+    addDeltaFile(conf, t, null, 300L, 310L, 11);
+    addDeltaFile(conf, t, null, 311L, 321L, 11);
+
+    burnThroughTransactions(320);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setTablename("cmomwbv");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("cmomwbv", compacts.get(0).getTablename());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void enoughDeltasNoBase() throws Exception {
+    Table t = newTable("default", "ednb", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addDeltaFile(conf, t, p, 1L, 201L, 200);
+    addDeltaFile(conf, t, p, 202L, 202L, 1);
+    addDeltaFile(conf, t, p, 203L, 203L, 1);
+    addDeltaFile(conf, t, p, 204L, 204L, 1);
+    addDeltaFile(conf, t, p, 205L, 205L, 1);
+    addDeltaFile(conf, t, p, 206L, 206L, 1);
+    addDeltaFile(conf, t, p, 207L, 207L, 1);
+    addDeltaFile(conf, t, p, 208L, 208L, 1);
+    addDeltaFile(conf, t, p, 209L, 209L, 1);
+    addDeltaFile(conf, t, p, 210L, 210L, 1);
+    addDeltaFile(conf, t, p, 211L, 211L, 1);
+
+    burnThroughTransactions(210);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+    comp.setTablename("ednb");
+    comp.setPartitionname("ds=today");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("ednb", compacts.get(0).getTablename());
+    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  @Test
+  public void twoTxnsOnSamePartitionGenerateOneCompactionRequest() throws Exception {
+    Table t = newTable("default", "ttospgocr", true);
+    Partition p = newPartition(t, "today");
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+    burnThroughTransactions(23);
+
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+    comp.setTablename("ttospgocr");
+    comp.setPartitionname("ds=today");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    txnid = openTxn();
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
+    comp.setTablename("ttospgocr");
+    comp.setPartitionname("ds=today");
+    components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    res = txnHandler.lock(req);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("initiated", compacts.get(0).getState());
+    Assert.assertEquals("ttospgocr", compacts.get(0).getTablename());
+    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+  }
+
+  // TODO test compactions with legacy file types
+
+  @Before
+  public void setUpTxnDb() throws Exception {
+    TxnDbUtil.setConfValues(new HiveConf());
+  }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1586488&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java Thu Apr 10 22:32:20 2014
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the worker thread and its MR jobs.
+ */
+public class TestWorker extends CompactorTest {
+  static final private String CLASS_NAME = TestWorker.class.getName();
+  static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  public TestWorker() throws Exception {
+    super();
+  }
+
+  @Test
+  public void nothing() throws Exception {
+    // Test that the whole things works when there's nothing in the queue.  This is just a
+    // survival test.
+    startWorker(new HiveConf());
+  }
+
+  @Test
+  public void stringableMap() throws Exception {
+    // Empty map case
+    CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap<String, String>());
+    String s = m.toString();
+    Assert.assertEquals("0:", s);
+    m = new CompactorMR.StringableMap(s);
+    Assert.assertEquals(0, m.size());
+
+    Map<String, String> base = new HashMap<String, String>();
+    base.put("mary", "poppins");
+    base.put("bert", null);
+    base.put(null, "banks");
+    m = new CompactorMR.StringableMap(base);
+    s = m.toString();
+    m = new CompactorMR.StringableMap(s);
+    Assert.assertEquals(3, m.size());
+    Map<String, Boolean> saw = new HashMap<String, Boolean>(3);
+    saw.put("mary", false);
+    saw.put("bert", false);
+    saw.put(null, false);
+    for (Map.Entry<String, String> e : m.entrySet()) {
+      saw.put(e.getKey(), true);
+      if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", e.getValue());
+      else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue());
+      else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue());
+      else Assert.fail("Unexpected value " + e.getKey());
+    }
+    Assert.assertEquals(3, saw.size());
+    Assert.assertTrue(saw.get("mary"));
+    Assert.assertTrue(saw.get("bert"));
+    Assert.assertTrue(saw.get(null));
+   }
+
+  @Test
+  public void stringableList() throws Exception {
+    // Empty list case
+    CompactorMR.StringableList ls = new CompactorMR.StringableList();
+    String s = ls.toString();
+    Assert.assertEquals("0:", s);
+    ls = new CompactorMR.StringableList(s);
+    Assert.assertEquals(0, ls.size());
+
+    ls = new CompactorMR.StringableList();
+    ls.add(new Path("/tmp"));
+    ls.add(new Path("/usr"));
+    s = ls.toString();
+    Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
+        "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s));
+    ls = new CompactorMR.StringableList(s);
+    Assert.assertEquals(2, ls.size());
+    boolean sawTmp = false, sawUsr = false;
+    for (Path p : ls) {
+      if ("/tmp".equals(p.toString())) sawTmp = true;
+      else if ("/usr".equals(p.toString())) sawUsr = true;
+      else Assert.fail("Unexpected path " + p.toString());
+    }
+    Assert.assertTrue(sawTmp);
+    Assert.assertTrue(sawUsr);
+  }
+
+  @Test
+  public void inputSplit() throws Exception {
+    String basename = "/warehouse/foo/base_1";
+    String delta1 = "/warehouse/foo/delta_2_3";
+    String delta2 = "/warehouse/foo/delta_4_7";
+
+    HiveConf conf = new HiveConf();
+    Path file = new Path(System.getProperty("java.io.tmpdir") +
+        System.getProperty("file.separator") + "newWriteInputSplitTest");
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream os = fs.create(file);
+    for (int i = 0; i < 10; i++) {
+      os.writeBytes("mary had a little lamb its fleece was white as snow\n");
+    }
+    os.close();
+    List<Path> files = new ArrayList<Path>(1);
+    files.add(file);
+
+    Path[] deltas = new Path[2];
+    deltas[0] = new Path(delta1);
+    deltas[1] = new Path(delta2);
+
+    CompactorMR.CompactorInputSplit split =
+        new CompactorMR.CompactorInputSplit(conf, 3, files, new Path(basename), deltas);
+
+    Assert.assertEquals(520L, split.getLength());
+    String[] locations = split.getLocations();
+    Assert.assertEquals(1, locations.length);
+    Assert.assertEquals("localhost", locations[0]);
+
+    ByteArrayOutputStream buf = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(buf);
+    split.write(out);
+
+    split = new CompactorMR.CompactorInputSplit();
+    DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+    split.readFields(in);
+
+    Assert.assertEquals(3, split.getBucket());
+    Assert.assertEquals(basename, split.getBaseDir().toString());
+    deltas = split.getDeltaDirs();
+    Assert.assertEquals(2, deltas.length);
+    Assert.assertEquals(delta1, deltas[0].toString());
+    Assert.assertEquals(delta2, deltas[1].toString());
+  }
+
+  @Test
+  public void inputSplitNullBase() throws Exception {
+    String delta1 = "/warehouse/foo/delta_2_3";
+    String delta2 = "/warehouse/foo/delta_4_7";
+
+    HiveConf conf = new HiveConf();
+    Path file = new Path(System.getProperty("java.io.tmpdir") +
+        System.getProperty("file.separator") + "newWriteInputSplitTest");
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream os = fs.create(file);
+    for (int i = 0; i < 10; i++) {
+      os.writeBytes("mary had a little lamb its fleece was white as snow\n");
+    }
+    os.close();
+    List<Path> files = new ArrayList<Path>(1);
+    files.add(file);
+
+    Path[] deltas = new Path[2];
+    deltas[0] = new Path(delta1);
+    deltas[1] = new Path(delta2);
+
+    CompactorMR.CompactorInputSplit split =
+        new CompactorMR.CompactorInputSplit(conf, 3, files, null, deltas);
+
+    ByteArrayOutputStream buf = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(buf);
+    split.write(out);
+
+    split = new CompactorMR.CompactorInputSplit();
+    DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+    split.readFields(in);
+
+    Assert.assertEquals(3, split.getBucket());
+    Assert.assertNull(split.getBaseDir());
+    deltas = split.getDeltaDirs();
+    Assert.assertEquals(2, deltas.length);
+    Assert.assertEquals(delta1, deltas[0].toString());
+    Assert.assertEquals(delta2, deltas[1].toString());
+  }
+
+  @Test
+  public void sortedTable() throws Exception {
+    List<Order> sortCols = new ArrayList<Order>(1);
+    sortCols.add(new Order("b", 1));
+
+    Table t = newTable("default", "st", false, new HashMap<String, String>(), sortCols);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+    addDeltaFile(conf, t, null, 21L, 24L, 4);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "st", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    // There should still be four directories in the location.
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+    Assert.assertEquals(4, stat.length);
+  }
+
+  @Test
+  public void sortedPartition() throws Exception {
+    List<Order> sortCols = new ArrayList<Order>(1);
+    sortCols.add(new Order("b", 1));
+
+    Table t = newTable("default", "sp", true, new HashMap<String, String>(), sortCols);
+    Partition p = newPartition(t, "today", sortCols);
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+    addDeltaFile(conf, t, p, 21L, 24L, 4);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "sp", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    // There should still be four directories in the location.
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+    Assert.assertEquals(4, stat.length);
+  }
+
+  @Test
+  public void minorTableWithBase() throws Exception {
+    LOG.debug("Starting minorTableWithBase");
+    Table t = newTable("default", "mtwb", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    startWorker(conf);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still now be 5 directories in the location
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString());
+    Assert.assertEquals(4, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewDelta = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(208L, buckets[0].getLen());
+        Assert.assertEquals(208L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+  }
+
+  @Test
+  public void minorPartitionWithBase() throws Exception {
+    Table t = newTable("default", "mpwb", true);
+    Partition p = newPartition(t, "today");
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "mpwb", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still be four directories in the location.
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+    Assert.assertEquals(4, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewDelta = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(208L, buckets[0].getLen());
+        Assert.assertEquals(208L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+  }
+
+  @Test
+  public void minorTableNoBase() throws Exception {
+    LOG.debug("Starting minorTableWithBase");
+    Table t = newTable("default", "mtnb", false);
+
+    HiveConf conf = new HiveConf();
+
+    addDeltaFile(conf, t, null, 1L, 2L, 2);
+    addDeltaFile(conf, t, null, 3L, 4L, 2);
+
+    burnThroughTransactions(5);
+
+    CompactionRequest rqst = new CompactionRequest("default", "mtnb", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still now be 5 directories in the location
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+    Assert.assertEquals(3, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewDelta = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("delta_0000001_0000004")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(208L, buckets[0].getLen());
+        Assert.assertEquals(208L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+  }
+
+  @Test
+  public void majorTableWithBase() throws Exception {
+    LOG.debug("Starting majorTableWithBase");
+    Table t = newTable("default", "matwb", false);
+
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, null, 20L, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "matwb", CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still now be 5 directories in the location
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+    Assert.assertEquals(4, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewBase = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("base_0000024")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1248L, buckets[0].getLen());
+        Assert.assertEquals(1248L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+  }
+
+  @Test
+  public void majorPartitionWithBase() throws Exception {
+    LOG.debug("Starting majorPartitionWithBase");
+    Table t = newTable("default", "mapwb", true);
+    Partition p = newPartition(t, "today");
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20);
+    addDeltaFile(conf, t, p, 21L, 22L, 2);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still be four directories in the location.
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+    Assert.assertEquals(4, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewBase = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("base_0000024")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1248L, buckets[0].getLen());
+        Assert.assertEquals(1248L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+  }
+
+  @Test
+  public void majorTableNoBase() throws Exception {
+    LOG.debug("Starting majorTableNoBase");
+    Table t = newTable("default", "matnb", false);
+
+    HiveConf conf = new HiveConf();
+
+    addDeltaFile(conf, t, null, 1L, 2L, 2);
+    addDeltaFile(conf, t, null, 3L, 4L, 2);
+
+    burnThroughTransactions(5);
+
+    CompactionRequest rqst = new CompactionRequest("default", "matnb", CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still now be 5 directories in the location
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+    Assert.assertEquals(3, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewBase = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("base_0000004")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(208L, buckets[0].getLen());
+        Assert.assertEquals(208L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+  }
+
+  @Test
+  public void majorTableLegacy() throws Exception {
+    LOG.debug("Starting majorTableLegacy");
+    Table t = newTable("default", "matl", false);
+
+    HiveConf conf = new HiveConf();
+
+    addLegacyFile(conf, t, null, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "matl", CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still now be 5 directories in the location
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+    //Assert.assertEquals(4, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewBase = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("base_0000024")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1248L, buckets[0].getLen());
+        Assert.assertEquals(1248L, buckets[1].getLen());
+      } else {
+        LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+  }
+
+  @Test
+  public void minorTableLegacy() throws Exception {
+    LOG.debug("Starting minorTableLegacy");
+    Table t = newTable("default", "mtl", false);
+
+    HiveConf conf = new HiveConf();
+
+    addLegacyFile(conf, t, null, 20);
+    addDeltaFile(conf, t, null, 21L, 22L, 2);
+    addDeltaFile(conf, t, null, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "mtl", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still now be 5 directories in the location
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewDelta = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("delta_0000021_0000024")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+      } else {
+        LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+  }
+
+  @Test
+  public void majorPartitionWithBaseMissingBuckets() throws Exception {
+    Table t = newTable("default", "mapwbmb", true);
+    Partition p = newPartition(t, "today");
+    HiveConf conf = new HiveConf();
+
+    addBaseFile(conf, t, p, 20L, 20, 2, false);
+    addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false);
+    addDeltaFile(conf, t, p, 23L, 24L, 2);
+
+    burnThroughTransactions(25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "mapwbmb", CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+
+    startWorker(new HiveConf());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    // There should still be four directories in the location.
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+    Assert.assertEquals(4, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    boolean sawNewBase = false;
+    for (int i = 0; i < stat.length; i++) {
+      if (stat[i].getPath().getName().equals("base_0000024")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        // Bucket 0 should be small and bucket 1 should be large, make sure that's the case
+        Assert.assertTrue(
+            ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen()
+            && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L == buckets[1] .getLen())
+            ||
+            ("bucket_00000".equals(buckets[1].getPath().getName()) && 104L == buckets[1].getLen()
+            && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L == buckets[0] .getLen())
+        );
+      } else {
+        LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+  }
+
+  @Before
+  public void setUpTxnDb() throws Exception {
+    TxnDbUtil.setConfValues(new HiveConf());
+  }
+}