You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/02 20:53:54 UTC
[6/6] hive git commit: HIVE-16534 : Add capability to tell aborted
transactions apart from open transactions in ValidTxnList (Wei Zheng,
reviewed by Eugene Koifman)
HIVE-16534 : Add capability to tell aborted transactions apart from open transactions in ValidTxnList (Wei Zheng, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6af51245
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6af51245
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6af51245
Branch: refs/heads/master
Commit: 6af512457c36ec6abb994a7078bf5cd5686e0440
Parents: 0ffff40
Author: Wei Zheng <we...@apache.org>
Authored: Tue May 2 13:53:39 2017 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Tue May 2 13:53:39 2017 -0700
----------------------------------------------------------------------
.../hive/common/ValidCompactorTxnList.java | 11 +-
.../hadoop/hive/common/ValidReadTxnList.java | 115 +-
.../apache/hadoop/hive/common/ValidTxnList.java | 18 +-
.../hive/common/TestValidReadTxnList.java | 29 +-
.../hive/metastore/TestHiveMetaStoreTxns.java | 8 +-
.../hive/ql/txn/compactor/TestCompactor.java | 10 +
metastore/if/hive_metastore.thrift | 3 +-
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2080 +++++++++---------
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 1416 ++++++------
.../gen/thrift/gen-cpp/hive_metastore_types.h | 11 +-
.../hive/metastore/api/GetOpenTxnsResponse.java | 150 +-
.../gen-php/metastore/ThriftHiveMetastore.php | 1268 +++++------
.../src/gen/thrift/gen-php/metastore/Types.php | 535 ++---
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 35 +-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 7 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 11 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 13 +-
.../txn/TestValidCompactorTxnList.java | 63 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 3 +-
19 files changed, 3064 insertions(+), 2722 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
index 334b93e..8f55354 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.common;
import java.util.Arrays;
+import java.util.BitSet;
/**
* An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
@@ -40,11 +41,12 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
}
/**
* @param abortedTxnList list of all aborted transactions
+ * @param abortedBits bitset marking whether the corresponding transaction is aborted
* @param highWatermark highest committed transaction to be considered for compaction,
* equivalently (lowest_open_txn - 1).
*/
- public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) {
- super(abortedTxnList, highWatermark);
+ public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) {
+ super(abortedTxnList, abortedBits, highWatermark); // abortedBits should be all true as everything in exceptions are aborted txns
if(this.exceptions.length <= 0) {
return;
}
@@ -75,4 +77,9 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
}
+
+ @Override
+ public boolean isTxnAborted(long txnid) {
+ return Arrays.binarySearch(exceptions, txnid) >= 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index 2f35917..4e57772 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.common;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
+import java.util.BitSet;
/**
* An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
@@ -30,32 +31,27 @@ import java.util.Arrays;
public class ValidReadTxnList implements ValidTxnList {
protected long[] exceptions;
+ protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open
//default value means there are no open txn in the snapshot
private long minOpenTxn = Long.MAX_VALUE;
protected long highWatermark;
public ValidReadTxnList() {
- this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE);
+ this(new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE);
}
/**
* Used if there are no open transactions in the snapshot
*/
- public ValidReadTxnList(long[] exceptions, long highWatermark) {
- this(exceptions, highWatermark, Long.MAX_VALUE);
+ public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark) {
+ this(exceptions, abortedBits, highWatermark, Long.MAX_VALUE);
}
- public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) {
- if (exceptions.length == 0) {
- this.exceptions = exceptions;
- } else {
- this.exceptions = exceptions.clone();
- Arrays.sort(this.exceptions);
+ public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark, long minOpenTxn) {
+ if (exceptions.length > 0) {
this.minOpenTxn = minOpenTxn;
- if(this.exceptions[0] <= 0) {
- //should never happen of course
- throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found");
- }
}
+ this.exceptions = exceptions;
+ this.abortedBits = abortedBits;
this.highWatermark = highWatermark;
}
@@ -118,12 +114,28 @@ public class ValidReadTxnList implements ValidTxnList {
buf.append(':');
buf.append(minOpenTxn);
if (exceptions.length == 0) {
- buf.append(':');
+ buf.append(':'); // separator for open txns
+ buf.append(':'); // separator for aborted txns
} else {
- for(long except: exceptions) {
- buf.append(':');
- buf.append(except);
+ StringBuilder open = new StringBuilder();
+ StringBuilder abort = new StringBuilder();
+ for (int i = 0; i < exceptions.length; i++) {
+ if (abortedBits.get(i)) {
+ if (abort.length() > 0) {
+ abort.append(',');
+ }
+ abort.append(exceptions[i]);
+ } else {
+ if (open.length() > 0) {
+ open.append(',');
+ }
+ open.append(exceptions[i]);
+ }
}
+ buf.append(':');
+ buf.append(open);
+ buf.append(':');
+ buf.append(abort);
}
return buf.toString();
}
@@ -133,13 +145,41 @@ public class ValidReadTxnList implements ValidTxnList {
if (src == null || src.length() == 0) {
highWatermark = Long.MAX_VALUE;
exceptions = new long[0];
+ abortedBits = new BitSet();
} else {
String[] values = src.split(":");
highWatermark = Long.parseLong(values[0]);
minOpenTxn = Long.parseLong(values[1]);
- exceptions = new long[values.length - 2];
- for(int i = 2; i < values.length; ++i) {
- exceptions[i-2] = Long.parseLong(values[i]);
+ String[] openTxns = new String[0];
+ String[] abortedTxns = new String[0];
+ if (values.length < 3) {
+ openTxns = new String[0];
+ abortedTxns = new String[0];
+ } else if (values.length == 3) {
+ if (!values[2].isEmpty()) {
+ openTxns = values[2].split(",");
+ }
+ } else {
+ if (!values[2].isEmpty()) {
+ openTxns = values[2].split(",");
+ }
+ if (!values[3].isEmpty()) {
+ abortedTxns = values[3].split(",");
+ }
+ }
+ exceptions = new long[openTxns.length + abortedTxns.length];
+ int i = 0;
+ for (String open : openTxns) {
+ exceptions[i++] = Long.parseLong(open);
+ }
+ for (String abort : abortedTxns) {
+ exceptions[i++] = Long.parseLong(abort);
+ }
+ Arrays.sort(exceptions);
+ abortedBits = new BitSet(exceptions.length);
+ for (String abort : abortedTxns) {
+ int index = Arrays.binarySearch(exceptions, Long.parseLong(abort));
+ abortedBits.set(index);
}
}
}
@@ -157,5 +197,40 @@ public class ValidReadTxnList implements ValidTxnList {
public long getMinOpenTxn() {
return minOpenTxn;
}
+
+ @Override
+ public boolean isTxnAborted(long txnid) {
+ int index = Arrays.binarySearch(exceptions, txnid);
+ return index >= 0 && abortedBits.get(index);
+ }
+
+ @Override
+ public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) {
+ // check the easy cases first
+ if (highWatermark < minTxnId) {
+ return RangeResponse.NONE;
+ }
+
+ int count = 0; // number of aborted txns found in exceptions
+
+ // traverse the aborted txns list, starting at first aborted txn index
+ for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) {
+ long abortedTxnId = exceptions[i];
+ if (abortedTxnId > maxTxnId) { // we've already gone beyond the specified range
+ break;
+ }
+ if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) {
+ count++;
+ }
+ }
+
+ if (count == 0) {
+ return RangeResponse.NONE;
+ } else if (count == (maxTxnId - minTxnId + 1)) {
+ return RangeResponse.ALL;
+ } else {
+ return RangeResponse.SOME;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
index 5e1e4ee..d4ac02c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
@@ -71,7 +71,7 @@ public interface ValidTxnList {
/**
* Populate this validTxnList from the string. It is assumed that the string
- * was created via {@link #writeToString()}.
+ * was created via {@link #writeToString()} and the exceptions list is sorted.
* @param src source string.
*/
public void readFromString(String src);
@@ -89,4 +89,20 @@ public interface ValidTxnList {
* @return a list of invalid transaction ids
*/
public long[] getInvalidTransactions();
+
+ /**
+ * Indicates whether a given transaction is aborted.
+ * @param txnid id for the transaction
+ * @return true if aborted, false otherwise
+ */
+ public boolean isTxnAborted(long txnid);
+
+ /**
+ * Find out if a range of transaction ids are aborted.
+ * @param minTxnId minimum txnid to look for, inclusive
+ * @param maxTxnId maximum txnid to look for, inclusive
+ * @return Indicate whether none, some, or all of these transactions are aborted.
+ */
+ public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId);
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
index 6661158..00ee820 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.util.BitSet;
/**
* Tests for {@link ValidReadTxnList}
@@ -34,9 +35,9 @@ public class TestValidReadTxnList {
@Test
public void noExceptions() throws Exception {
- ValidTxnList txnList = new ValidReadTxnList(new long[0], 1, Long.MAX_VALUE);
+ ValidTxnList txnList = new ValidReadTxnList(new long[0], new BitSet(), 1, Long.MAX_VALUE);
String str = txnList.writeToString();
- Assert.assertEquals("1:" + Long.MAX_VALUE + ":", str);
+ Assert.assertEquals("1:" + Long.MAX_VALUE + "::", str);
ValidTxnList newList = new ValidReadTxnList();
newList.readFromString(str);
Assert.assertTrue(newList.isTxnValid(1));
@@ -45,9 +46,9 @@ public class TestValidReadTxnList {
@Test
public void exceptions() throws Exception {
- ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5, 4L);
+ ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, new BitSet(), 5, 4L);
String str = txnList.writeToString();
- Assert.assertEquals("5:4:2:4", str);
+ Assert.assertEquals("5:4:2,4:", str);
ValidTxnList newList = new ValidReadTxnList();
newList.readFromString(str);
Assert.assertTrue(newList.isTxnValid(1));
@@ -62,7 +63,7 @@ public class TestValidReadTxnList {
public void longEnoughToCompress() throws Exception {
long[] exceptions = new long[1000];
for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
- ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, new BitSet(), 2000, 900);
String str = txnList.writeToString();
ValidTxnList newList = new ValidReadTxnList();
newList.readFromString(str);
@@ -76,7 +77,7 @@ public class TestValidReadTxnList {
public void readWriteConfig() throws Exception {
long[] exceptions = new long[1000];
for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
- ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, new BitSet(), 2000, 900);
String str = txnList.writeToString();
Configuration conf = new Configuration();
conf.set(ValidTxnList.VALID_TXNS_KEY, str);
@@ -89,4 +90,20 @@ public class TestValidReadTxnList {
newConf.readFields(in);
Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY));
}
+
+ @Test
+ public void testAbortedTxn() throws Exception {
+ long[] exceptions = {2L, 4L, 6L, 8L, 10L};
+ BitSet bitSet = new BitSet(exceptions.length);
+ bitSet.set(0); // mark txn "2L" aborted
+ bitSet.set(3); // mark txn "8L" aborted
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, bitSet, 11, 4L);
+ String str = txnList.writeToString();
+ Assert.assertEquals("11:4:4,6,10:2,8", str);
+ Assert.assertTrue(txnList.isTxnAborted(2L));
+ Assert.assertFalse(txnList.isTxnAborted(4L));
+ Assert.assertFalse(txnList.isTxnAborted(6L));
+ Assert.assertTrue(txnList.isTxnAborted(8L));
+ Assert.assertFalse(txnList.isTxnAborted(10L));
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index a0f18c6..1002be7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -128,7 +128,7 @@ public class TestHiveMetaStoreTxns {
Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
validTxns.isTxnRangeValid(5L, 10L));
- validTxns = new ValidReadTxnList("10:5:4:5:6");
+ validTxns = new ValidReadTxnList("10:5:4,5,6:");
Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
validTxns.isTxnRangeValid(4,6));
Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
@@ -223,15 +223,15 @@ public class TestHiveMetaStoreTxns {
@Test
public void stringifyValidTxns() throws Exception {
// Test with just high water mark
- ValidTxnList validTxns = new ValidReadTxnList("1:" + Long.MAX_VALUE + ":");
+ ValidTxnList validTxns = new ValidReadTxnList("1:" + Long.MAX_VALUE + "::");
String asString = validTxns.toString();
- Assert.assertEquals("1:" + Long.MAX_VALUE + ":", asString);
+ Assert.assertEquals("1:" + Long.MAX_VALUE + "::", asString);
validTxns = new ValidReadTxnList(asString);
Assert.assertEquals(1, validTxns.getHighWatermark());
Assert.assertNotNull(validTxns.getInvalidTransactions());
Assert.assertEquals(0, validTxns.getInvalidTransactions().length);
asString = validTxns.toString();
- Assert.assertEquals("1:" + Long.MAX_VALUE + ":", asString);
+ Assert.assertEquals("1:" + Long.MAX_VALUE + "::", asString);
validTxns = new ValidReadTxnList(asString);
Assert.assertEquals(1, validTxns.getHighWatermark());
Assert.assertNotNull(validTxns.getInvalidTransactions());
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index f92db7c..e0c05bd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1332,6 +1332,16 @@ public class TestCompactor {
public boolean isValidBase(long txnid) {
return true;
}
+
+ @Override
+ public boolean isTxnAborted(long txnid) {
+ return true;
+ }
+
+ @Override
+ public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) {
+ return RangeResponse.ALL;
+ }
};
OrcInputFormat aif = new OrcInputFormat();
http://git-wip-us.apache.org/repos/asf/hive/blob/6af51245/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index ff66836..ca6a007 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -636,8 +636,9 @@ struct GetOpenTxnsInfoResponse {
struct GetOpenTxnsResponse {
1: required i64 txn_high_water_mark,
- 2: required set<i64> open_txns,
+ 2: required list<i64> open_txns, // set<i64> changed to list<i64> since 3.0
3: optional i64 min_open_txn, //since 1.3,2.2
+ 4: required binary abortedBits, // since 3.0
}
struct OpenTxnRequest {