You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/25 02:44:27 UTC

hive git commit: HIVE-11540 - Too many delta files during Compaction - OOM (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/master 24ec6beda -> e3ef96f2b


HIVE-11540 - Too many delta files during Compaction - OOM (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e3ef96f2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e3ef96f2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e3ef96f2

Branch: refs/heads/master
Commit: e3ef96f2b83ffa932dd59fc3df79dff8747309ba
Parents: 24ec6be
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 24 18:44:05 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 24 18:44:05 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  15 ++-
 .../hive/ql/txn/compactor/CompactorMR.java      |  96 ++++++++++-----
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   6 +-
 .../hive/ql/txn/compactor/CompactorTest.java    |   4 +
 .../hive/ql/txn/compactor/TestWorker.java       | 120 +++++++++++++++++--
 6 files changed, 201 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e3ef96f2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f065048..dc79415 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1551,6 +1551,8 @@ public class HiveConf extends Configuration {
     HIVE_COMPACTOR_DELTA_PCT_THRESHOLD("hive.compactor.delta.pct.threshold", 0.1f,
         "Percentage (fractional) size of the delta files relative to the base that will trigger\n" +
         "a major compaction. (1.0 = 100%, so the default 0.1 = 10%.)"),
+    COMPACTOR_MAX_NUM_DELTA("hive.compactor.max.num.delta", 500, "Maximum number of delta files that " +
+      "the compactor will attempt to handle in a single job."),
 
     HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
         "Number of aborted transactions involving a given table or partition that will trigger\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/e3ef96f2/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 30db513..e8d070c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -132,6 +132,9 @@ public class AcidUtils {
     return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
   }
 
+  public static String baseDir(long txnId) {
+    return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
+  }
   /**
    * Create a filename for a bucket file.
    * @param directory the partition directory
@@ -221,14 +224,16 @@ public class AcidUtils {
     Path getBaseDirectory();
 
     /**
-     * Get the list of original files.
+     * Get the list of original files.  Not {@code null}.
      * @return the list of original files (eg. 000000_0)
      */
     List<HdfsFileStatusWithId> getOriginalFiles();
 
     /**
      * Get the list of base and delta directories that are valid and not
-     * obsolete.
+     * obsolete.  Not {@code null}.  List must be sorted in a specific way.
+     * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)}
+     * for details.
      * @return the minimal list of current directories
      */
     List<ParsedDelta> getCurrentDirectories();
@@ -237,7 +242,7 @@ public class AcidUtils {
      * Get the list of obsolete directories. After filtering out bases and
      * deltas that are not selected by the valid transaction list, return the
      * list of original files, bases, and deltas that have been replaced by
-     * more up to date ones.
+     * more up to date ones.  Not {@code null}.
      */
     List<FileStatus> getObsolete();
   }
@@ -284,6 +289,7 @@ public class AcidUtils {
      * happens in a different process; thus it's possible to have bases/deltas with
      * overlapping txnId boundaries.  The sort order helps figure out the "best" set of files
      * to use to get data.
+     * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20)
      */
     @Override
     public int compareTo(ParsedDelta parsedDelta) {
@@ -499,6 +505,9 @@ public class AcidUtils {
     }
 
     Collections.sort(working);
+    //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
+    //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
+    //subject to list of 'exceptions' in 'txnList' (not show in above example).
     long current = bestBase.txn;
     int lastStmtId = -1;
     for(ParsedDelta next: working) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e3ef96f2/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 391f99a..bab01a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -94,18 +95,8 @@ public class CompactorMR {
   public CompactorMR() {
   }
 
-  /**
-   * Run a compactor job.
-   * @param conf Hive configuration file
-   * @param jobName name to run this job with
-   * @param t metastore table
-   * @param sd metastore storage descriptor
-   * @param txns list of valid transactions
-   * @param isMajor is this a major compaction?
-   * @throws java.io.IOException if the job fails
-   */
-  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
-           ValidTxnList txns, boolean isMajor, Worker.StatsUpdater su) throws IOException {
+  private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
+                                    ValidTxnList txns) {
     JobConf job = new JobConf(conf);
     job.setJobName(jobName);
     job.setOutputKeyClass(NullWritable.class);
@@ -117,7 +108,7 @@ public class CompactorMR {
     job.setInputFormat(CompactorInputFormat.class);
     job.setOutputFormat(NullOutputFormat.class);
     job.setOutputCommitter(CompactorOutputCommitter.class);
-    
+
     String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
     if(queueName != null && queueName.length() > 0) {
       job.setQueueName(queueName);
@@ -127,23 +118,63 @@ public class CompactorMR {
     job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
     job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
     job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
-    job.setBoolean(IS_MAJOR, isMajor);
     job.setBoolean(IS_COMPRESSED, sd.isCompressed());
     job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
     job.setInt(NUM_BUCKETS, sd.getNumBuckets());
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     setColumnTypes(job, sd.getCols());
+    return job;
+  }
+  /**
+   * Run Compaction which may consist of several jobs on the cluster.
+   * @param conf Hive configuration file
+   * @param jobName name to run this job with
+   * @param t metastore table
+   * @param sd metastore storage descriptor
+   * @param txns list of valid transactions
+   * @param ci CompactionInfo
+   * @throws java.io.IOException if the job fails
+   */
+  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
+           ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+    JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);
 
     // Figure out and encode what files we need to read.  We do this here (rather than in
     // getSplits below) because as part of this we discover our minimum and maximum transactions,
     // and discovering that in getSplits is too late as we then have no way to pass it to our
     // mapper.
 
-    AcidUtils.Directory dir = AcidUtils.getAcidState(
-        new Path(sd.getLocation()), conf, txns, false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns, false);
+    List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
+    int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
+    if(parsedDeltas.size() > maxDeltastoHandle) {
+      /**
+       * if here, that means we have very high number of delta files.  This may be sign of a temporary
+       * glitch or a real issue.  For example, if transaction batch size or transaction size is set too
+       * low for the event flow rate in Streaming API, it may generate lots of delta files very
+       * quickly.  Another possibility is that Compaction is repeatedly failing and not actually compacting.
+       * Thus, force N minor compactions first to reduce number of deltas and then follow up with
+       * the compaction actually requested in {@link ci} which now needs to compact a lot fewer deltas
+       */
+      LOG.warn(parsedDeltas.size() + " delta files found for " + ci.getFullPartitionName()
+        + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " +
+        "especially if this message repeats.  Check that compaction is running properly.  Check for any " +
+        "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
+      int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
+      for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
+        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns);
+        launchCompactionJob(jobMinorCompact,
+          null, CompactionType.MINOR, null,
+          parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
+          maxDeltastoHandle, -1);
+      }
+      //now recompute state since we've done minor compactions and have different 'best' set of deltas
+      dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+    }
+
     StringableList dirsToSearch = new StringableList();
     Path baseDir = null;
-    if (isMajor) {
+    if (ci.isMajorCompaction()) {
       // There may not be a base dir if the partition was empty before inserts or if this
       // partition is just now being converted to ACID.
       baseDir = dir.getBaseDirectory();
@@ -166,14 +197,26 @@ public class CompactorMR {
       }
     }
 
-    List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
-
-    if (parsedDeltas == null || parsedDeltas.size() == 0) {
+    if (parsedDeltas.size() == 0) {
       // Seriously, no deltas?  Can't compact that.
       LOG.error(  "No delta files found to compact in " + sd.getLocation());
+      //couldn't someone want to run a Major compaction to convert old table to ACID?
       return;
     }
 
+    launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
+      dir.getCurrentDirectories().size(), dir.getObsolete().size());
+
+    su.gatherStats();
+  }
+  private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
+                                   StringableList dirsToSearch,
+                                   List<AcidUtils.ParsedDelta> parsedDeltas,
+                                   int curDirNumber, int obsoleteDirNumber) throws IOException {
+    job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
+    if(dirsToSearch == null) {
+      dirsToSearch = new StringableList();
+    }
     StringableList deltaDirs = new StringableList();
     long minTxn = Long.MAX_VALUE;
     long maxTxn = Long.MIN_VALUE;
@@ -190,18 +233,15 @@ public class CompactorMR {
     job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
     job.setLong(MIN_TXN, minTxn);
     job.setLong(MAX_TXN, maxTxn);
-    LOG.debug("Setting minimum transaction to " + minTxn);
-    LOG.debug("Setting maximume transaction to " + maxTxn);
 
+    LOG.info("Submitting " + compactionType + " compaction job '" +
+      job.getJobName() + "' to " + job.getQueueName() + " queue.  " +
+      "(current delta dirs count=" + curDirNumber +
+      ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
     RunningJob rj = JobClient.runJob(job);
-    LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" +
-      jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue.  " +
-      "(current delta dirs count=" + dir.getCurrentDirectories().size() +
-      ", obsolete delta dirs count=" + dir.getObsolete());
+    LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID());
     rj.waitForCompletion();
-    su.gatherStats();
   }
-
   /**
    * Set the column names and types into the job conf for the input format
    * to use.

http://git-wip-us.apache.org/repos/asf/hive/blob/e3ef96f2/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 0548117..cc7441a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -77,7 +77,7 @@ public class Worker extends CompactorThread {
       // Make sure nothing escapes this run method and kills the metastore at large,
       // so wrap it in a big catch Throwable statement.
       try {
-        CompactionInfo ci = txnHandler.findNextToCompact(name);
+        final CompactionInfo ci = txnHandler.findNextToCompact(name);
 
         if (ci == null && !stop.get()) {
           try {
@@ -158,14 +158,14 @@ public class Worker extends CompactorThread {
         launchedJob = true;
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
+            mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su);
+                mr.run(conf, jobName.toString(), t, sd, txns, ci, su);
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/hive/blob/e3ef96f2/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 5a8c932..39c0571 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -516,6 +516,10 @@ public abstract class CompactorTest {
   abstract boolean useHive130DeltaDirName();
 
   String makeDeltaDirName(long minTxnId, long maxTxnId) {
+    if(minTxnId != maxTxnId) {
+      //covers both streaming api and post compaction style.
+      return makeDeltaDirNameCompacted(minTxnId, maxTxnId);
+    }
     return useHive130DeltaDirName() ?
       AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e3ef96f2/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 11e5333..245e839 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -29,6 +30,7 @@ import org.junit.Test;
 import java.io.*;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,6 +39,10 @@ import java.util.Map;
 
 /**
  * Tests for the worker thread and its MR jobs.
+ * todo: most delta files in this test suite use txn id range, i.e. [N,N+M]
+ * That means that they all look like they were created by compaction or by streaming api.
+ * Delta files created by SQL should have [N,N] range (and a suffix in v1.3 and later)
+ * Need to change some of these to have better test coverage.
  */
 public class TestWorker extends CompactorTest {
   static final private String CLASS_NAME = TestWorker.class.getName();
@@ -325,18 +331,14 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    boolean is130 = this instanceof TestWorker2;
-    Assert.assertEquals(is130 ? 5 : 4, stat.length);
+    Assert.assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    if(is130) {//in1.3.0 orig delta is delta_00021_00022_0000 and compacted one is delta_00021_00022...
-      Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
-    }
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[1 + (is130 ? 1 : 0)].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[2 + (is130 ? 1 : 0)].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[3 + (is130 ? 1 : 0)].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
   }
 
   @Test
@@ -508,6 +510,108 @@ public class TestWorker extends CompactorTest {
   }
 
   @Test
+  public void minorNoBaseLotsOfDeltas() throws Exception {
+    compactNoBaseLotsOfDeltas(CompactionType.MINOR);
+  }
+  @Test
+  public void majorNoBaseLotsOfDeltas() throws Exception {
+    compactNoBaseLotsOfDeltas(CompactionType.MAJOR);
+  }
+  private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception {
+    conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2);
+    Table t = newTable("default", "mapwb", true);
+    Partition p = newPartition(t, "today");
+
+//    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 21L, 2);
+    addDeltaFile(t, p, 23L, 23L, 2);
+    //make it look like streaming API use case
+    addDeltaFile(t, p, 25L, 29L, 2);
+    addDeltaFile(t, p, 31L, 32L, 3);
+    //make it looks like 31-32 has been compacted, but not cleaned
+    addDeltaFile(t, p, 31L, 33L, 5);
+    addDeltaFile(t, p, 35L, 35L, 1);
+
+    /*since COMPACTOR_MAX_NUM_DELTA=2,
+    we expect files 1,2 to be minor compacted by 1 job to produce delta_21_23
+    * 3,5 to be minor compacted by 2nd job (file 4 is obsolete) to make delta_25_33 (4th is skipped)
+    *
+    * and then the 'requested'
+    * minor compaction to combine delta_21_23, delta_25_33 and delta_35_35 to make delta_21_35
+    * or major compaction to create base_35*/
+    burnThroughTransactions(35);
+    CompactionRequest rqst = new CompactionRequest("default", "mapwb", type);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+
+    startWorker();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
+    Assert.assertEquals(9, stat.length);
+
+    // Find the new delta file and make sure it has the right contents
+    BitSet matchesFound = new BitSet(9);
+    for (int i = 0; i < stat.length; i++) {
+      if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) {
+        matchesFound.set(0);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) {
+        matchesFound.set(1);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) {
+        matchesFound.set(2);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) {
+        matchesFound.set(3);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) {
+        matchesFound.set(4);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) {
+        matchesFound.set(5);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) {
+        matchesFound.set(6);
+      }
+      else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) {
+        matchesFound.set(7);
+      }
+      switch (type) {
+        //yes, both do set(8)
+        case MINOR:
+          if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) {
+            matchesFound.set(8);
+          }
+          break;
+        case MAJOR:
+          if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) {
+            matchesFound.set(8);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+    StringBuilder sb = null;
+    for(int i = 0; i < stat.length; i++) {
+      if(!matchesFound.get(i)) {
+        if(sb == null) {
+          sb = new StringBuilder("Some files are missing at index: ");
+        }
+        sb.append(i).append(",");
+      }
+    }
+    if (sb != null) {
+      Assert.assertTrue(sb.toString(), false);
+    }
+  }
+  @Test
   public void majorPartitionWithBase() throws Exception {
     LOG.debug("Starting majorPartitionWithBase");
     Table t = newTable("default", "mapwb", true);