You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/07/18 21:44:30 UTC

hive git commit: HIVE-13369 AcidUtils.getAcidState() is not paying attention toValidTxnList when choosing the best base file (Eugene Koifman, reviewed by Owen O'Malley)

Repository: hive
Updated Branches:
  refs/heads/master dfa1a5e1c -> 0896025e2


HIVE-13369 AcidUtils.getAcidState() is not paying attention toValidTxnList when choosing the best base file (Eugene Koifman, reviewed by Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0896025e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0896025e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0896025e

Branch: refs/heads/master
Commit: 0896025e24869b6362c430a804c784b3a5a51e63
Parents: dfa1a5e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 18 14:41:19 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 18 14:41:19 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   2 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  60 +++++++++--
 .../hive/ql/txn/AcidOpenTxnsCounterService.java |   8 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 100 +++++++++++++++++--
 4 files changed, 154 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 52dadb7..6ed5b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -449,6 +449,8 @@ public enum ErrorMsg {
   IMPORT_SEMANTIC_ERROR(10324, "Import Semantic Analyzer Error"),
   INVALID_FK_SYNTAX(10325, "Invalid Foreign Key syntax"),
   INVALID_PK_SYNTAX(10326, "Invalid Primary Key syntax"),
+  ACID_NOT_ENOUGH_HISTORY(10327, "Not enough history available for ({0},{1}).  " +
+    "Oldest available base: {2}", true),
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "

http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 36f38f6..c150ec5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -475,7 +476,9 @@ public class AcidUtils {
   /** State class for getChildState; cannot modify 2 things in a method. */
   private static class TxnBase {
     private FileStatus status;
-    private long txn;
+    private long txn = 0;
+    private long oldestBaseTxnId = Long.MAX_VALUE;
+    private Path oldestBase = null;
   }
 
   /**
@@ -571,6 +574,21 @@ public class AcidUtils {
       }
     }
 
+    if(bestBase.oldestBase != null && bestBase.status == null) {
+      /**
+       * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
+       * {@link txnList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
+       * cannot have any data for an open txn.  We could check {@link deltas} has files to cover
+       * [1,n] w/o gaps but this would almost never happen...*/
+      //todo: this should only care about 'open' tnxs (HIVE-14211)
+      long[] exceptions = txnList.getInvalidTransactions();
+      String minOpenTxn = exceptions != null && exceptions.length > 0 ?
+        Long.toString(exceptions[0]) : "x";
+      throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format(
+        Long.toString(txnList.getHighWatermark()),
+        minOpenTxn, bestBase.oldestBase.toString()));
+    }
+
     final Path base = bestBase.status == null ? null : bestBase.status.getPath();
     LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
         deltas.size());
@@ -598,7 +616,26 @@ public class AcidUtils {
       }
     };
   }
-
+  /**
+   * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view)
+   * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct
+   * snapshot for this reader.
+   * Note that such base is NOT obsolete.  Obsolete files are those that are "covered" by other
+   * files within the snapshot.
+   */
+  private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) {
+    /*This implementation is suboptimal.  It considers open/aborted txns invalid while we are only
+    * concerned with 'open' ones.  (Compaction removes any data that belongs to aborted txns and
+    * reads skip anything that belongs to aborted txn, thus base_7 is still OK if the only exception
+    * is txn 5 which is aborted).  So this implementation can generate false positives. (HIVE-14211)
+    * */
+    if(baseTxnId == Long.MIN_VALUE) {
+      //such base is created by 1st compaction in case of non-acid to acid table conversion
+      //By definition there are no open txns with id < 1.
+      return true;
+    }
+    return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId);
+  }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) {
@@ -606,13 +643,22 @@ public class AcidUtils {
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
       long txn = parseBase(p);
+      if(bestBase.oldestBaseTxnId > txn) {
+        //keep track for error reporting
+        bestBase.oldestBase = p;
+        bestBase.oldestBaseTxnId = txn;
+      }
       if (bestBase.status == null) {
-        bestBase.status = child;
-        bestBase.txn = txn;
+        if(isValidBase(txn, txnList)) {
+          bestBase.status = child;
+          bestBase.txn = txn;
+        }
       } else if (bestBase.txn < txn) {
-        obsolete.add(bestBase.status);
-        bestBase.status = child;
-        bestBase.txn = txn;
+        if(isValidBase(txn, txnList)) {
+          obsolete.add(bestBase.status);
+          bestBase.status = child;
+          bestBase.txn = txn;
+        }
       } else {
         obsolete.add(child);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
index f5eb8a1..08fcff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
@@ -47,6 +47,7 @@ public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase {
     return "Count number of open transactions";
   }
   private static final class OpenTxnsCounter implements Runnable {
+    private static volatile long lastLogTime = 0;
     private final TxnStore txnHandler;
     private final AtomicInteger isAliveCounter;
     private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) {
@@ -59,7 +60,12 @@ public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase {
         long startTime = System.currentTimeMillis();
         txnHandler.countOpenTxns();
         int count = isAliveCounter.incrementAndGet();
-        LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.  isAliveCounter=" + count);
+        if(System.currentTimeMillis() - lastLogTime > 60*1000) {
+          //don't flood the logs with too many msgs
+          LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime) / 1000 +
+            "seconds.  isAliveCounter=" + count);
+          lastLogTime = System.currentTimeMillis();
+        }
       }
       catch(Throwable t) {
         LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 5745dee..b83cea4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -217,25 +219,107 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_98_100/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_120_130/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
-    assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString());
+    assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
+    assertEquals(1, dir.getCurrentDirectories().size());
+    assertEquals("mock:/tbl/part1/delta_120_130",
+      dir.getCurrentDirectories().get(0).getPath().toString());
     List<FileStatus> obsoletes = dir.getObsolete();
     assertEquals(4, obsoletes.size());
     assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_100", obsoletes.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_25", obsoletes.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_5", obsoletes.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString());
     assertEquals(0, dir.getOriginalFiles().size());
-    assertEquals(0, dir.getCurrentDirectories().size());
-    // we should always get the latest base
+
     dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
-    assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString());
-  }
+    assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
+    assertEquals(0, dir.getCurrentDirectories().size());
+    obsoletes = dir.getObsolete();
+    assertEquals(1, obsoletes.size());
+    assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).getPath().toString());
+    assertEquals(0, dir.getOriginalFiles().size());
 
+    /*Single statemnt txns only: since we don't compact a txn range that includes an open txn,
+    the existence of delta_120_130 implies that 121 in the exception list is aborted unless
+    delta_120_130 is from streaming ingest in which case 121 can be open
+    (and thus 122-130 are open too)
+    For multi-statment txns, see HIVE-13369*/
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121"));
+    assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
+    assertEquals(1, dir.getCurrentDirectories().size());
+    assertEquals("mock:/tbl/part1/delta_120_130",
+      dir.getCurrentDirectories().get(0).getPath().toString());
+    obsoletes = dir.getObsolete();
+    assertEquals(4, obsoletes.size());
+    assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString());
+
+    boolean gotException = false;
+    try {
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5"));
+    }
+    catch(IOException e) {
+      gotException = true;
+      Assert.assertEquals("Not enough history available for (125,5).  Oldest available base: " +
+        "mock:/tbl/part1/base_5", e.getMessage());
+    }
+    Assert.assertTrue("Expected exception", gotException);
+
+    fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_1_10/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_12_25/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+    part = new MockPath(fs, "/tbl/part1");
+    try {
+      gotException = false;
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+    }
+    catch(IOException e) {
+      gotException = true;
+      Assert.assertEquals("Not enough history available for (150,7).  Oldest available base: " +
+        "mock:/tbl/part1/base_25", e.getMessage());
+    }
+    Assert.assertTrue("Expected exception", gotException);
+
+    fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_2_10/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+    part = new MockPath(fs, "/tbl/part1");
+    try {
+      gotException = false;
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+    }
+    catch(IOException e) {
+      gotException = true;
+      Assert.assertEquals("Not enough history available for (150,7).  Oldest available base: " +
+        "mock:/tbl/part1/base_25", e.getMessage());
+    }
+    Assert.assertTrue("Expected exception", gotException);
+
+    fs = new MockFileSystem(conf,
+      //non-acid to acid table conversion
+      new MockFile("mock:/tbl/part1/base_" + Long.MIN_VALUE + "/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+    part = new MockPath(fs, "/tbl/part1");
+    //note that we don't include current txn of the client in exception list to read-you-writes
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:"));
+    assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString());
+    assertEquals(1, dir.getCurrentDirectories().size());
+    assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString());
+    assertEquals(0, dir.getObsolete().size());
+  }
   @Test
   public void testObsoleteOriginals() throws Exception {
     Configuration conf = new Configuration();