You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/02 20:53:54 UTC

[6/6] hive git commit: HIVE-16534 : Add capability to tell aborted transactions apart from open transactions in ValidTxnList (Wei Zheng, reviewed by Eugene Koifman)

HIVE-16534 : Add capability to tell aborted transactions apart from open transactions in ValidTxnList (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6af51245
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6af51245
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6af51245

Branch: refs/heads/master
Commit: 6af512457c36ec6abb994a7078bf5cd5686e0440
Parents: 0ffff40
Author: Wei Zheng <we...@apache.org>
Authored: Tue May 2 13:53:39 2017 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Tue May 2 13:53:39 2017 -0700

----------------------------------------------------------------------
 .../hive/common/ValidCompactorTxnList.java      |   11 +-
 .../hadoop/hive/common/ValidReadTxnList.java    |  115 +-
 .../apache/hadoop/hive/common/ValidTxnList.java |   18 +-
 .../hive/common/TestValidReadTxnList.java       |   29 +-
 .../hive/metastore/TestHiveMetaStoreTxns.java   |    8 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |   10 +
 metastore/if/hive_metastore.thrift              |    3 +-
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2080 +++++++++---------
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 1416 ++++++------
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   11 +-
 .../hive/metastore/api/GetOpenTxnsResponse.java |  150 +-
 .../gen-php/metastore/ThriftHiveMetastore.php   | 1268 +++++------
 .../src/gen/thrift/gen-php/metastore/Types.php  |  535 ++---
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |   35 +-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |    7 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |   11 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   13 +-
 .../txn/TestValidCompactorTxnList.java          |   63 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |    3 +-
 19 files changed, 3064 insertions(+), 2722 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
index 334b93e..8f55354 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.common;
 
 import java.util.Arrays;
+import java.util.BitSet;
 
 /**
  * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
@@ -40,11 +41,12 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
   }
   /**
    * @param abortedTxnList list of all aborted transactions
+   * @param abortedBits bitset marking whether the corresponding transaction is aborted
    * @param highWatermark highest committed transaction to be considered for compaction,
    *                      equivalently (lowest_open_txn - 1).
    */
-  public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) {
-    super(abortedTxnList, highWatermark);
+  public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) {
+    super(abortedTxnList, abortedBits, highWatermark); // abortedBits should be all true as everything in exceptions are aborted txns
     if(this.exceptions.length <= 0) {
       return;
     }
@@ -75,4 +77,9 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
   public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
     return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
   }
+
+  @Override
+  public boolean isTxnAborted(long txnid) {
+    return Arrays.binarySearch(exceptions, txnid) >= 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index 2f35917..4e57772 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.common;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Arrays;
+import java.util.BitSet;
 
 /**
  * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
@@ -30,32 +31,27 @@ import java.util.Arrays;
 public class ValidReadTxnList implements ValidTxnList {
 
   protected long[] exceptions;
+  protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open
   //default value means there are no open txn in the snapshot
   private long minOpenTxn = Long.MAX_VALUE;
   protected long highWatermark;
 
   public ValidReadTxnList() {
-    this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE);
+    this(new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE);
   }
 
   /**
    * Used if there are no open transactions in the snapshot
    */
-  public ValidReadTxnList(long[] exceptions, long highWatermark) {
-    this(exceptions, highWatermark, Long.MAX_VALUE);
+  public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark) {
+    this(exceptions, abortedBits, highWatermark, Long.MAX_VALUE);
   }
-  public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) {
-    if (exceptions.length == 0) {
-      this.exceptions = exceptions;
-    } else {
-      this.exceptions = exceptions.clone();
-      Arrays.sort(this.exceptions);
+  public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark, long minOpenTxn) {
+    if (exceptions.length > 0) {
       this.minOpenTxn = minOpenTxn;
-      if(this.exceptions[0] <= 0) {
-        //should never happen of course
-        throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found");
-      }
     }
+    this.exceptions = exceptions;
+    this.abortedBits = abortedBits;
     this.highWatermark = highWatermark;
   }
 
@@ -118,12 +114,28 @@ public class ValidReadTxnList implements ValidTxnList {
     buf.append(':');
     buf.append(minOpenTxn);
     if (exceptions.length == 0) {
-      buf.append(':');
+      buf.append(':');  // separator for open txns
+      buf.append(':');  // separator for aborted txns
     } else {
-      for(long except: exceptions) {
-        buf.append(':');
-        buf.append(except);
+      StringBuilder open = new StringBuilder();
+      StringBuilder abort = new StringBuilder();
+      for (int i = 0; i < exceptions.length; i++) {
+        if (abortedBits.get(i)) {
+          if (abort.length() > 0) {
+            abort.append(',');
+          }
+          abort.append(exceptions[i]);
+        } else {
+          if (open.length() > 0) {
+            open.append(',');
+          }
+          open.append(exceptions[i]);
+        }
       }
+      buf.append(':');
+      buf.append(open);
+      buf.append(':');
+      buf.append(abort);
     }
     return buf.toString();
   }
@@ -133,13 +145,41 @@ public class ValidReadTxnList implements ValidTxnList {
     if (src == null || src.length() == 0) {
       highWatermark = Long.MAX_VALUE;
       exceptions = new long[0];
+      abortedBits = new BitSet();
     } else {
       String[] values = src.split(":");
       highWatermark = Long.parseLong(values[0]);
       minOpenTxn = Long.parseLong(values[1]);
-      exceptions = new long[values.length - 2];
-      for(int i = 2; i < values.length; ++i) {
-        exceptions[i-2] = Long.parseLong(values[i]);
+      String[] openTxns = new String[0];
+      String[] abortedTxns = new String[0];
+      if (values.length < 3) {
+        openTxns = new String[0];
+        abortedTxns = new String[0];
+      } else if (values.length == 3) {
+        if (!values[2].isEmpty()) {
+          openTxns = values[2].split(",");
+        }
+      } else {
+        if (!values[2].isEmpty()) {
+          openTxns = values[2].split(",");
+        }
+        if (!values[3].isEmpty()) {
+          abortedTxns = values[3].split(",");
+        }
+      }
+      exceptions = new long[openTxns.length + abortedTxns.length];
+      int i = 0;
+      for (String open : openTxns) {
+        exceptions[i++] = Long.parseLong(open);
+      }
+      for (String abort : abortedTxns) {
+        exceptions[i++] = Long.parseLong(abort);
+      }
+      Arrays.sort(exceptions);
+      abortedBits = new BitSet(exceptions.length);
+      for (String abort : abortedTxns) {
+        int index = Arrays.binarySearch(exceptions, Long.parseLong(abort));
+        abortedBits.set(index);
       }
     }
   }
@@ -157,5 +197,40 @@ public class ValidReadTxnList implements ValidTxnList {
   public long getMinOpenTxn() {
     return minOpenTxn;
   }
+
+  @Override
+  public boolean isTxnAborted(long txnid) {
+    int index = Arrays.binarySearch(exceptions, txnid);
+    return index >= 0 && abortedBits.get(index);
+  }
+
+  @Override
+  public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) {
+    // check the easy cases first
+    if (highWatermark < minTxnId) {
+      return RangeResponse.NONE;
+    }
+
+    int count = 0;  // number of aborted txns found in exceptions
+
+    // traverse the aborted txns list, starting at first aborted txn index
+    for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) {
+      long abortedTxnId = exceptions[i];
+      if (abortedTxnId > maxTxnId) {  // we've already gone beyond the specified range
+        break;
+      }
+      if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) {
+        count++;
+      }
+    }
+
+    if (count == 0) {
+      return RangeResponse.NONE;
+    } else if (count == (maxTxnId - minTxnId + 1)) {
+      return RangeResponse.ALL;
+    } else {
+      return RangeResponse.SOME;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
index 5e1e4ee..d4ac02c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
@@ -71,7 +71,7 @@ public interface ValidTxnList {
 
   /**
    * Populate this validTxnList from the string.  It is assumed that the string
-   * was created via {@link #writeToString()}.
+   * was created via {@link #writeToString()} and the exceptions list is sorted.
    * @param src source string.
    */
   public void readFromString(String src);
@@ -89,4 +89,20 @@ public interface ValidTxnList {
    * @return a list of invalid transaction ids
    */
   public long[] getInvalidTransactions();
+
+  /**
+   * Indicates whether a given transaction is aborted.
+   * @param txnid id for the transaction
+   * @return true if aborted, false otherwise
+   */
+  public boolean isTxnAborted(long txnid);
+
+  /**
+   * Find out if a range of transaction ids are aborted.
+   * @param minTxnId minimum txnid to look for, inclusive
+   * @param maxTxnId maximum txnid to look for, inclusive
+   * @return Indicate whether none, some, or all of these transactions are aborted.
+   */
+  public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId);
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
index 6661158..00ee820 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.util.BitSet;
 
 /**
  * Tests for {@link ValidReadTxnList}
@@ -34,9 +35,9 @@ public class TestValidReadTxnList {
 
   @Test
   public void noExceptions() throws Exception {
-    ValidTxnList txnList = new ValidReadTxnList(new long[0], 1, Long.MAX_VALUE);
+    ValidTxnList txnList = new ValidReadTxnList(new long[0], new BitSet(), 1, Long.MAX_VALUE);
     String str = txnList.writeToString();
-    Assert.assertEquals("1:" + Long.MAX_VALUE + ":", str);
+    Assert.assertEquals("1:" + Long.MAX_VALUE + "::", str);
     ValidTxnList newList = new ValidReadTxnList();
     newList.readFromString(str);
     Assert.assertTrue(newList.isTxnValid(1));
@@ -45,9 +46,9 @@ public class TestValidReadTxnList {
 
   @Test
   public void exceptions() throws Exception {
-    ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5, 4L);
+    ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, new BitSet(), 5, 4L);
     String str = txnList.writeToString();
-    Assert.assertEquals("5:4:2:4", str);
+    Assert.assertEquals("5:4:2,4:", str);
     ValidTxnList newList = new ValidReadTxnList();
     newList.readFromString(str);
     Assert.assertTrue(newList.isTxnValid(1));
@@ -62,7 +63,7 @@ public class TestValidReadTxnList {
   public void longEnoughToCompress() throws Exception {
     long[] exceptions = new long[1000];
     for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
-    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, new BitSet(), 2000, 900);
     String str = txnList.writeToString();
     ValidTxnList newList = new ValidReadTxnList();
     newList.readFromString(str);
@@ -76,7 +77,7 @@ public class TestValidReadTxnList {
   public void readWriteConfig() throws Exception {
     long[] exceptions = new long[1000];
     for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
-    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, new BitSet(), 2000, 900);
     String str = txnList.writeToString();
     Configuration conf = new Configuration();
     conf.set(ValidTxnList.VALID_TXNS_KEY, str);
@@ -89,4 +90,20 @@ public class TestValidReadTxnList {
     newConf.readFields(in);
     Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY));
   }
+
+  @Test
+  public void testAbortedTxn() throws Exception {
+    long[] exceptions = {2L, 4L, 6L, 8L, 10L};
+    BitSet bitSet = new BitSet(exceptions.length);
+    bitSet.set(0);  // mark txn "2L" aborted
+    bitSet.set(3);  // mark txn "8L" aborted
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, bitSet, 11, 4L);
+    String str = txnList.writeToString();
+    Assert.assertEquals("11:4:4,6,10:2,8", str);
+    Assert.assertTrue(txnList.isTxnAborted(2L));
+    Assert.assertFalse(txnList.isTxnAborted(4L));
+    Assert.assertFalse(txnList.isTxnAborted(6L));
+    Assert.assertTrue(txnList.isTxnAborted(8L));
+    Assert.assertFalse(txnList.isTxnAborted(10L));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index a0f18c6..1002be7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -128,7 +128,7 @@ public class TestHiveMetaStoreTxns {
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
         validTxns.isTxnRangeValid(5L, 10L));
 
-    validTxns = new ValidReadTxnList("10:5:4:5:6");
+    validTxns = new ValidReadTxnList("10:5:4,5,6:");
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
         validTxns.isTxnRangeValid(4,6));
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
@@ -223,15 +223,15 @@ public class TestHiveMetaStoreTxns {
   @Test
   public void stringifyValidTxns() throws Exception {
     // Test with just high water mark
-    ValidTxnList validTxns = new ValidReadTxnList("1:" + Long.MAX_VALUE + ":");
+    ValidTxnList validTxns = new ValidReadTxnList("1:" + Long.MAX_VALUE + "::");
     String asString = validTxns.toString();
-    Assert.assertEquals("1:" + Long.MAX_VALUE + ":", asString);
+    Assert.assertEquals("1:" + Long.MAX_VALUE + "::", asString);
     validTxns = new ValidReadTxnList(asString);
     Assert.assertEquals(1, validTxns.getHighWatermark());
     Assert.assertNotNull(validTxns.getInvalidTransactions());
     Assert.assertEquals(0, validTxns.getInvalidTransactions().length);
     asString = validTxns.toString();
-    Assert.assertEquals("1:" + Long.MAX_VALUE + ":", asString);
+    Assert.assertEquals("1:" + Long.MAX_VALUE + "::", asString);
     validTxns = new ValidReadTxnList(asString);
     Assert.assertEquals(1, validTxns.getHighWatermark());
     Assert.assertNotNull(validTxns.getInvalidTransactions());

http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index f92db7c..e0c05bd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1332,6 +1332,16 @@ public class TestCompactor {
       public boolean isValidBase(long txnid) {
         return true;
       }
+
+      @Override
+      public boolean isTxnAborted(long txnid) {
+        return true;
+      }
+
+      @Override
+      public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) {
+        return RangeResponse.ALL;
+      }
     };
 
     OrcInputFormat aif = new OrcInputFormat();

http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index ff66836..ca6a007 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -636,8 +636,9 @@ struct GetOpenTxnsInfoResponse {
 
 struct GetOpenTxnsResponse {
     1: required i64 txn_high_water_mark,
-    2: required set<i64> open_txns,
+    2: required list<i64> open_txns,  // set<i64> changed to list<i64> since 3.0
     3: optional i64 min_open_txn, //since 1.3,2.2
+    4: required binary abortedBits,   // since 3.0
 }
 
 struct OpenTxnRequest {