You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/07/29 22:36:24 UTC
[2/2] hive git commit: HIVE-14350 Aborted txns cause false positive
"Not enough history available..." msgs (Eugene Koifman,
reviewed by Alan Gates)
HIVE-14350 Aborted txns cause false positive "Not enough history available..." msgs (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8a183766
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8a183766
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8a183766
Branch: refs/heads/master
Commit: 8a1837665ce1a1f129e1a923dd02866f8b7ffba6
Parents: 541fcb8
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Jul 29 14:18:23 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Jul 29 14:19:02 2016 -0700
----------------------------------------------------------------------
.../hive/common/ValidCompactorTxnList.java | 109 +++++++------------
.../hadoop/hive/common/ValidReadTxnList.java | 40 ++++++-
.../apache/hadoop/hive/common/ValidTxnList.java | 7 ++
.../hive/common/TestValidReadTxnList.java | 12 +-
.../hive/ql/txn/compactor/TestCompactor.java | 4 +
metastore/if/hive_metastore.thrift | 1 +
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 25 +++++
.../gen/thrift/gen-cpp/hive_metastore_types.h | 15 ++-
.../hive/metastore/api/GetOpenTxnsResponse.java | 105 +++++++++++++++++-
.../src/gen/thrift/gen-php/metastore/Types.php | 23 ++++
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 15 ++-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 16 ++-
.../hadoop/hive/metastore/txn/TxnUtils.java | 28 +++--
.../txn/TestValidCompactorTxnList.java | 40 ++++---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 2 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 11 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 16 +++
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 35 +++---
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 8 +-
20 files changed, 375 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
index ad79e2c..334b93e 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -18,94 +18,61 @@
package org.apache.hadoop.hive.common;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-
import java.util.Arrays;
/**
- * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
- * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
- * is committed or aborted. Additionally it will return none if there are any open transactions
- * below the max transaction given, since we don't want to compact above open transactions. For
- * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These
- * produce the logic we need to assure that the compactor only sees records less than the lowest
+ * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ *
+ * Compaction should only include txns up to smallest open txn (exclussive).
+ * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList.
+ * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved
+ * transactions. Any txn above {@code highWatermark} is unresolved.
+ * These produce the logic we need to assure that the compactor only sees records less than the lowest
* open transaction when choosing which files to compact, but that it still ignores aborted
* records when compacting.
+ *
+ * See {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList()} for proper
+ * way to construct this.
*/
public class ValidCompactorTxnList extends ValidReadTxnList {
- //TODO: refactor this - minOpenTxn is not needed if we set
- // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns)
-
- // The minimum open transaction id
- private long minOpenTxn;
-
public ValidCompactorTxnList() {
super();
- minOpenTxn = -1;
}
-
/**
- *
- * @param exceptions list of all open and aborted transactions
- * @param minOpen lowest open transaction
- * @param highWatermark highest committed transaction
+ * @param abortedTxnList list of all aborted transactions
+ * @param highWatermark highest committed transaction to be considered for compaction,
+ * equivalently (lowest_open_txn - 1).
*/
- public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) {
- super(exceptions, highWatermark);
- minOpenTxn = minOpen;
+ public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) {
+ super(abortedTxnList, highWatermark);
+ if(this.exceptions.length <= 0) {
+ return;
+ }
+ //now that exceptions (aka abortedTxnList) is sorted
+ int idx = Arrays.binarySearch(this.exceptions, highWatermark);
+ int lastElementPos;
+ if(idx < 0) {
+ int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc
+ lastElementPos = insertionPoint - 1;
+ }
+ else {
+ lastElementPos = idx;
+ }
+ /**
+ * ensure that we throw out any exceptions above highWatermark to make
+ * {@link #isTxnValid(long)} faster
+ */
+ this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1);
}
-
public ValidCompactorTxnList(String value) {
super(value);
}
-
+ /**
+ * Returns {@link org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL} if all txns in
+ * the range are resolved and RangeResponse.NONE otherwise
+ */
@Override
public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
- if (highWatermark < minTxnId) {
- return RangeResponse.NONE;
- } else if (minOpenTxn < 0) {
- return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
- } else {
- return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
- }
- }
-
- @Override
- public String writeToString() {
- StringBuilder buf = new StringBuilder();
- buf.append(highWatermark);
- buf.append(':');
- buf.append(minOpenTxn);
- if (exceptions.length == 0) {
- buf.append(':');
- } else {
- for(long except: exceptions) {
- buf.append(':');
- buf.append(except);
- }
- }
- return buf.toString();
- }
-
- @Override
- public void readFromString(String src) {
- if (src == null || src.length() == 0) {
- highWatermark = Long.MAX_VALUE;
- exceptions = new long[0];
- } else {
- String[] values = src.split(":");
- highWatermark = Long.parseLong(values[0]);
- minOpenTxn = Long.parseLong(values[1]);
- exceptions = new long[values.length - 2];
- for(int i = 2; i < values.length; ++i) {
- exceptions[i-2] = Long.parseLong(values[i]);
- }
- }
- }
-
- @VisibleForTesting
- public long getMinOpenTxn() {
- return minOpenTxn;
+ return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index fda242d..2f35917 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -18,28 +18,43 @@
package org.apache.hadoop.hive.common;
+import com.google.common.annotations.VisibleForTesting;
+
import java.util.Arrays;
/**
- * An implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
+ * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
* This class will view a transaction as valid only if it is committed. Both open and aborted
* transactions will be seen as invalid.
*/
public class ValidReadTxnList implements ValidTxnList {
protected long[] exceptions;
+ //default value means there are no open txn in the snapshot
+ private long minOpenTxn = Long.MAX_VALUE;
protected long highWatermark;
public ValidReadTxnList() {
- this(new long[0], Long.MAX_VALUE);
+ this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE);
}
+ /**
+ * Used if there are no open transactions in the snapshot
+ */
public ValidReadTxnList(long[] exceptions, long highWatermark) {
+ this(exceptions, highWatermark, Long.MAX_VALUE);
+ }
+ public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) {
if (exceptions.length == 0) {
this.exceptions = exceptions;
} else {
this.exceptions = exceptions.clone();
Arrays.sort(this.exceptions);
+ this.minOpenTxn = minOpenTxn;
+ if(this.exceptions[0] <= 0) {
+ //should never happen of course
+ throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found");
+ }
}
this.highWatermark = highWatermark;
}
@@ -56,6 +71,14 @@ public class ValidReadTxnList implements ValidTxnList {
return Arrays.binarySearch(exceptions, txnid) < 0;
}
+ /**
+ * We cannot use a base file if its range contains an open txn.
+ * @param txnid from base_xxxx
+ */
+ @Override
+ public boolean isValidBase(long txnid) {
+ return minOpenTxn > txnid && txnid <= highWatermark;
+ }
@Override
public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
// check the easy cases first
@@ -92,6 +115,8 @@ public class ValidReadTxnList implements ValidTxnList {
public String writeToString() {
StringBuilder buf = new StringBuilder();
buf.append(highWatermark);
+ buf.append(':');
+ buf.append(minOpenTxn);
if (exceptions.length == 0) {
buf.append(':');
} else {
@@ -111,9 +136,10 @@ public class ValidReadTxnList implements ValidTxnList {
} else {
String[] values = src.split(":");
highWatermark = Long.parseLong(values[0]);
- exceptions = new long[values.length - 1];
- for(int i = 1; i < values.length; ++i) {
- exceptions[i-1] = Long.parseLong(values[i]);
+ minOpenTxn = Long.parseLong(values[1]);
+ exceptions = new long[values.length - 2];
+ for(int i = 2; i < values.length; ++i) {
+ exceptions[i-2] = Long.parseLong(values[i]);
}
}
}
@@ -127,5 +153,9 @@ public class ValidReadTxnList implements ValidTxnList {
public long[] getInvalidTransactions() {
return exceptions;
}
+ @VisibleForTesting
+ public long getMinOpenTxn() {
+ return minOpenTxn;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
index 87e7e30..5e1e4ee 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
@@ -47,6 +47,13 @@ public interface ValidTxnList {
public boolean isTxnValid(long txnid);
/**
+ * Returns {@code true} if such base file can be used to materialize the snapshot represented by
+ * this {@code ValidTxnList}.
+ * @param txnid highest txn in a given base_xxxx file
+ */
+ public boolean isValidBase(long txnid);
+
+ /**
* Find out if a range of transaction ids are valid. Note that valid may have different meanings
* for different implementations, as some will only want to see committed transactions and some
* both committed and aborted.
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
index d3c6803..6661158 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
@@ -34,9 +34,9 @@ public class TestValidReadTxnList {
@Test
public void noExceptions() throws Exception {
- ValidTxnList txnList = new ValidReadTxnList(new long[0], 1);
+ ValidTxnList txnList = new ValidReadTxnList(new long[0], 1, Long.MAX_VALUE);
String str = txnList.writeToString();
- Assert.assertEquals("1:", str);
+ Assert.assertEquals("1:" + Long.MAX_VALUE + ":", str);
ValidTxnList newList = new ValidReadTxnList();
newList.readFromString(str);
Assert.assertTrue(newList.isTxnValid(1));
@@ -45,9 +45,9 @@ public class TestValidReadTxnList {
@Test
public void exceptions() throws Exception {
- ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5);
+ ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5, 4L);
String str = txnList.writeToString();
- Assert.assertEquals("5:2:4", str);
+ Assert.assertEquals("5:4:2:4", str);
ValidTxnList newList = new ValidReadTxnList();
newList.readFromString(str);
Assert.assertTrue(newList.isTxnValid(1));
@@ -62,7 +62,7 @@ public class TestValidReadTxnList {
public void longEnoughToCompress() throws Exception {
long[] exceptions = new long[1000];
for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
- ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
String str = txnList.writeToString();
ValidTxnList newList = new ValidReadTxnList();
newList.readFromString(str);
@@ -76,7 +76,7 @@ public class TestValidReadTxnList {
public void readWriteConfig() throws Exception {
long[] exceptions = new long[1000];
for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
- ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+ ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
String str = txnList.writeToString();
Configuration conf = new Configuration();
conf.set(ValidTxnList.VALID_TXNS_KEY, str);
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index ca2a912..731caa8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1038,6 +1038,10 @@ public class TestCompactor {
public long[] getInvalidTransactions() {
return new long[0];
}
+ @Override
+ public boolean isValidBase(long txnid) {
+ return true;
+ }
};
OrcInputFormat aif = new OrcInputFormat();
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index 4d92b73..a2e35b8 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -635,6 +635,7 @@ struct GetOpenTxnsInfoResponse {
struct GetOpenTxnsResponse {
1: required i64 txn_high_water_mark,
2: required set<i64> open_txns,
+ 3: optional i64 min_open_txn, //since 1.3,2.2
}
struct OpenTxnRequest {
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 79460a8..174b539 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -11911,6 +11911,11 @@ void GetOpenTxnsResponse::__set_open_txns(const std::set<int64_t> & val) {
this->open_txns = val;
}
+void GetOpenTxnsResponse::__set_min_open_txn(const int64_t val) {
+ this->min_open_txn = val;
+__isset.min_open_txn = true;
+}
+
uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -11963,6 +11968,14 @@ uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->min_open_txn);
+ this->__isset.min_open_txn = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -12000,6 +12013,11 @@ uint32_t GetOpenTxnsResponse::write(::apache::thrift::protocol::TProtocol* oprot
}
xfer += oprot->writeFieldEnd();
+ if (this->__isset.min_open_txn) {
+ xfer += oprot->writeFieldBegin("min_open_txn", ::apache::thrift::protocol::T_I64, 3);
+ xfer += oprot->writeI64(this->min_open_txn);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -12009,15 +12027,21 @@ void swap(GetOpenTxnsResponse &a, GetOpenTxnsResponse &b) {
using ::std::swap;
swap(a.txn_high_water_mark, b.txn_high_water_mark);
swap(a.open_txns, b.open_txns);
+ swap(a.min_open_txn, b.min_open_txn);
+ swap(a.__isset, b.__isset);
}
GetOpenTxnsResponse::GetOpenTxnsResponse(const GetOpenTxnsResponse& other524) {
txn_high_water_mark = other524.txn_high_water_mark;
open_txns = other524.open_txns;
+ min_open_txn = other524.min_open_txn;
+ __isset = other524.__isset;
}
GetOpenTxnsResponse& GetOpenTxnsResponse::operator=(const GetOpenTxnsResponse& other525) {
txn_high_water_mark = other525.txn_high_water_mark;
open_txns = other525.open_txns;
+ min_open_txn = other525.min_open_txn;
+ __isset = other525.__isset;
return *this;
}
void GetOpenTxnsResponse::printTo(std::ostream& out) const {
@@ -12025,6 +12049,7 @@ void GetOpenTxnsResponse::printTo(std::ostream& out) const {
out << "GetOpenTxnsResponse(";
out << "txn_high_water_mark=" << to_string(txn_high_water_mark);
out << ", " << "open_txns=" << to_string(open_txns);
+ out << ", " << "min_open_txn="; (__isset.min_open_txn ? (out << to_string(min_open_txn)) : (out << "<null>"));
out << ")";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
index ec81798..bfec694 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -4881,29 +4881,42 @@ inline std::ostream& operator<<(std::ostream& out, const GetOpenTxnsInfoResponse
return out;
}
+typedef struct _GetOpenTxnsResponse__isset {
+ _GetOpenTxnsResponse__isset() : min_open_txn(false) {}
+ bool min_open_txn :1;
+} _GetOpenTxnsResponse__isset;
class GetOpenTxnsResponse {
public:
GetOpenTxnsResponse(const GetOpenTxnsResponse&);
GetOpenTxnsResponse& operator=(const GetOpenTxnsResponse&);
- GetOpenTxnsResponse() : txn_high_water_mark(0) {
+ GetOpenTxnsResponse() : txn_high_water_mark(0), min_open_txn(0) {
}
virtual ~GetOpenTxnsResponse() throw();
int64_t txn_high_water_mark;
std::set<int64_t> open_txns;
+ int64_t min_open_txn;
+
+ _GetOpenTxnsResponse__isset __isset;
void __set_txn_high_water_mark(const int64_t val);
void __set_open_txns(const std::set<int64_t> & val);
+ void __set_min_open_txn(const int64_t val);
+
bool operator == (const GetOpenTxnsResponse & rhs) const
{
if (!(txn_high_water_mark == rhs.txn_high_water_mark))
return false;
if (!(open_txns == rhs.open_txns))
return false;
+ if (__isset.min_open_txn != rhs.__isset.min_open_txn)
+ return false;
+ else if (__isset.min_open_txn && !(min_open_txn == rhs.min_open_txn))
+ return false;
return true;
}
bool operator != (const GetOpenTxnsResponse &rhs) const {
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
index 6986fc2..8230d38 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
@@ -40,6 +40,7 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
private static final org.apache.thrift.protocol.TField TXN_HIGH_WATER_MARK_FIELD_DESC = new org.apache.thrift.protocol.TField("txn_high_water_mark", org.apache.thrift.protocol.TType.I64, (short)1);
private static final org.apache.thrift.protocol.TField OPEN_TXNS_FIELD_DESC = new org.apache.thrift.protocol.TField("open_txns", org.apache.thrift.protocol.TType.SET, (short)2);
+ private static final org.apache.thrift.protocol.TField MIN_OPEN_TXN_FIELD_DESC = new org.apache.thrift.protocol.TField("min_open_txn", org.apache.thrift.protocol.TType.I64, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -49,11 +50,13 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
private long txn_high_water_mark; // required
private Set<Long> open_txns; // required
+ private long min_open_txn; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
TXN_HIGH_WATER_MARK((short)1, "txn_high_water_mark"),
- OPEN_TXNS((short)2, "open_txns");
+ OPEN_TXNS((short)2, "open_txns"),
+ MIN_OPEN_TXN((short)3, "min_open_txn");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -72,6 +75,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
return TXN_HIGH_WATER_MARK;
case 2: // OPEN_TXNS
return OPEN_TXNS;
+ case 3: // MIN_OPEN_TXN
+ return MIN_OPEN_TXN;
default:
return null;
}
@@ -113,7 +118,9 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
// isset id assignments
private static final int __TXN_HIGH_WATER_MARK_ISSET_ID = 0;
+ private static final int __MIN_OPEN_TXN_ISSET_ID = 1;
private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.MIN_OPEN_TXN};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -122,6 +129,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
tmpMap.put(_Fields.OPEN_TXNS, new org.apache.thrift.meta_data.FieldMetaData("open_txns", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))));
+ tmpMap.put(_Fields.MIN_OPEN_TXN, new org.apache.thrift.meta_data.FieldMetaData("min_open_txn", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetOpenTxnsResponse.class, metaDataMap);
}
@@ -149,6 +158,7 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
Set<Long> __this__open_txns = new HashSet<Long>(other.open_txns);
this.open_txns = __this__open_txns;
}
+ this.min_open_txn = other.min_open_txn;
}
public GetOpenTxnsResponse deepCopy() {
@@ -160,6 +170,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
setTxn_high_water_markIsSet(false);
this.txn_high_water_mark = 0;
this.open_txns = null;
+ setMin_open_txnIsSet(false);
+ this.min_open_txn = 0;
}
public long getTxn_high_water_mark() {
@@ -222,6 +234,28 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
}
}
+ public long getMin_open_txn() {
+ return this.min_open_txn;
+ }
+
+ public void setMin_open_txn(long min_open_txn) {
+ this.min_open_txn = min_open_txn;
+ setMin_open_txnIsSet(true);
+ }
+
+ public void unsetMin_open_txn() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID);
+ }
+
+ /** Returns true if field min_open_txn is set (has been assigned a value) and false otherwise */
+ public boolean isSetMin_open_txn() {
+ return EncodingUtils.testBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID);
+ }
+
+ public void setMin_open_txnIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case TXN_HIGH_WATER_MARK:
@@ -240,6 +274,14 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
}
break;
+ case MIN_OPEN_TXN:
+ if (value == null) {
+ unsetMin_open_txn();
+ } else {
+ setMin_open_txn((Long)value);
+ }
+ break;
+
}
}
@@ -251,6 +293,9 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
case OPEN_TXNS:
return getOpen_txns();
+ case MIN_OPEN_TXN:
+ return getMin_open_txn();
+
}
throw new IllegalStateException();
}
@@ -266,6 +311,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
return isSetTxn_high_water_mark();
case OPEN_TXNS:
return isSetOpen_txns();
+ case MIN_OPEN_TXN:
+ return isSetMin_open_txn();
}
throw new IllegalStateException();
}
@@ -301,6 +348,15 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
return false;
}
+ boolean this_present_min_open_txn = true && this.isSetMin_open_txn();
+ boolean that_present_min_open_txn = true && that.isSetMin_open_txn();
+ if (this_present_min_open_txn || that_present_min_open_txn) {
+ if (!(this_present_min_open_txn && that_present_min_open_txn))
+ return false;
+ if (this.min_open_txn != that.min_open_txn)
+ return false;
+ }
+
return true;
}
@@ -318,6 +374,11 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
if (present_open_txns)
list.add(open_txns);
+ boolean present_min_open_txn = true && (isSetMin_open_txn());
+ list.add(present_min_open_txn);
+ if (present_min_open_txn)
+ list.add(min_open_txn);
+
return list.hashCode();
}
@@ -349,6 +410,16 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetMin_open_txn()).compareTo(other.isSetMin_open_txn());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMin_open_txn()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.min_open_txn, other.min_open_txn);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -380,6 +451,12 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
sb.append(this.open_txns);
}
first = false;
+ if (isSetMin_open_txn()) {
+ if (!first) sb.append(", ");
+ sb.append("min_open_txn:");
+ sb.append(this.min_open_txn);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -459,6 +536,14 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 3: // MIN_OPEN_TXN
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.min_open_txn = iprot.readI64();
+ struct.setMin_open_txnIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -487,6 +572,11 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
}
oprot.writeFieldEnd();
}
+ if (struct.isSetMin_open_txn()) {
+ oprot.writeFieldBegin(MIN_OPEN_TXN_FIELD_DESC);
+ oprot.writeI64(struct.min_open_txn);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -512,6 +602,14 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
oprot.writeI64(_iter472);
}
}
+ BitSet optionals = new BitSet();
+ if (struct.isSetMin_open_txn()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetMin_open_txn()) {
+ oprot.writeI64(struct.min_open_txn);
+ }
}
@Override
@@ -530,6 +628,11 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
}
}
struct.setOpen_txnsIsSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.min_open_txn = iprot.readI64();
+ struct.setMin_open_txnIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index f505208..d6f7f49 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -12002,6 +12002,10 @@ class GetOpenTxnsResponse {
* @var int[]
*/
public $open_txns = null;
+ /**
+ * @var int
+ */
+ public $min_open_txn = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -12018,6 +12022,10 @@ class GetOpenTxnsResponse {
'type' => TType::I64,
),
),
+ 3 => array(
+ 'var' => 'min_open_txn',
+ 'type' => TType::I64,
+ ),
);
}
if (is_array($vals)) {
@@ -12027,6 +12035,9 @@ class GetOpenTxnsResponse {
if (isset($vals['open_txns'])) {
$this->open_txns = $vals['open_txns'];
}
+ if (isset($vals['min_open_txn'])) {
+ $this->min_open_txn = $vals['min_open_txn'];
+ }
}
}
@@ -12077,6 +12088,13 @@ class GetOpenTxnsResponse {
$xfer += $input->skip($ftype);
}
break;
+ case 3:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->min_open_txn);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -12116,6 +12134,11 @@ class GetOpenTxnsResponse {
}
$xfer += $output->writeFieldEnd();
}
+ if ($this->min_open_txn !== null) {
+ $xfer += $output->writeFieldBegin('min_open_txn', TType::I64, 3);
+ $xfer += $output->writeI64($this->min_open_txn);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 8d88cd7..2d308c9 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -8264,17 +8264,20 @@ class GetOpenTxnsResponse:
Attributes:
- txn_high_water_mark
- open_txns
+ - min_open_txn
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'txn_high_water_mark', None, None, ), # 1
(2, TType.SET, 'open_txns', (TType.I64,None), None, ), # 2
+ (3, TType.I64, 'min_open_txn', None, None, ), # 3
)
- def __init__(self, txn_high_water_mark=None, open_txns=None,):
+ def __init__(self, txn_high_water_mark=None, open_txns=None, min_open_txn=None,):
self.txn_high_water_mark = txn_high_water_mark
self.open_txns = open_txns
+ self.min_open_txn = min_open_txn
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -8300,6 +8303,11 @@ class GetOpenTxnsResponse:
iprot.readSetEnd()
else:
iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I64:
+ self.min_open_txn = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -8321,6 +8329,10 @@ class GetOpenTxnsResponse:
oprot.writeI64(iter419)
oprot.writeSetEnd()
oprot.writeFieldEnd()
+ if self.min_open_txn is not None:
+ oprot.writeFieldBegin('min_open_txn', TType.I64, 3)
+ oprot.writeI64(self.min_open_txn)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -8336,6 +8348,7 @@ class GetOpenTxnsResponse:
value = 17
value = (value * 31) ^ hash(self.txn_high_water_mark)
value = (value * 31) ^ hash(self.open_txns)
+ value = (value * 31) ^ hash(self.min_open_txn)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 0964cd8..bd94e98 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -1843,10 +1843,12 @@ class GetOpenTxnsResponse
include ::Thrift::Struct, ::Thrift::Struct_Union
TXN_HIGH_WATER_MARK = 1
OPEN_TXNS = 2
+ MIN_OPEN_TXN = 3
FIELDS = {
TXN_HIGH_WATER_MARK => {:type => ::Thrift::Types::I64, :name => 'txn_high_water_mark'},
- OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}}
+ OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}},
+ MIN_OPEN_TXN => {:type => ::Thrift::Types::I64, :name => 'min_open_txn', :optional => true}
}
def struct_fields; FIELDS; end
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b121644..e8c5fac 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -361,15 +361,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
close(rs);
Set<Long> openList = new HashSet<Long>();
//need the WHERE clause below to ensure consistent results with READ_COMMITTED
- s = "select txn_id from TXNS where txn_id <= " + hwm;
+ s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm;
LOG.debug("Going to execute query<" + s + ">");
rs = stmt.executeQuery(s);
+ long minOpenTxn = Long.MAX_VALUE;
while (rs.next()) {
- openList.add(rs.getLong(1));
+ long txnId = rs.getLong(1);
+ openList.add(txnId);
+ char c = rs.getString(2).charAt(0);
+ if(c == TXN_OPEN) {
+ minOpenTxn = Math.min(minOpenTxn, txnId);
+ }
}
LOG.debug("Going to rollback");
dbConn.rollback();
- return new GetOpenTxnsResponse(hwm, openList);
+ GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList);
+ if(minOpenTxn < Long.MAX_VALUE) {
+ otr.setMin_open_txn(minOpenTxn);
+ }
+ return otr;
} catch (SQLException e) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 39b18ac..2ffa1da 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -57,31 +58,42 @@ public class TxnUtils {
if (currentTxn > 0 && currentTxn == txn) continue;
exceptions[i++] = txn;
}
- return new ValidReadTxnList(exceptions, highWater);
+ if(txns.isSetMin_open_txn()) {
+ return new ValidReadTxnList(exceptions, highWater, txns.getMin_open_txn());
+ }
+ else {
+ return new ValidReadTxnList(exceptions, highWater);
+ }
}
/**
* Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
* {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
* compact the files, and thus treats only open transactions as invalid. Additionally any
- * txnId > highestOpenTxnId is also invalid. This is avoid creating something like
+ * txnId > highestOpenTxnId is also invalid. This is to avoid creating something like
* delta_17_120 where txnId 80, for example, is still open.
* @param txns txn list from the metastore
* @return a valid txn list.
*/
public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
- //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" +
- // quoteChar(TXN_OPEN) to compute compute HWM...
long highWater = txns.getTxn_high_water_mark();
long minOpenTxn = Long.MAX_VALUE;
long[] exceptions = new long[txns.getOpen_txnsSize()];
int i = 0;
for (TxnInfo txn : txns.getOpen_txns()) {
- if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();//todo: only add Aborted
- }//remove all exceptions < minOpenTxn
+ if (txn.getState() == TxnState.OPEN) {
+ minOpenTxn = Math.min(minOpenTxn, txn.getId());
+ }
+ else {
+ //only need aborted since we don't consider anything above minOpenTxn
+ exceptions[i++] = txn.getId();
+ }
+ }
+ if(i < exceptions.length) {
+ exceptions = Arrays.copyOf(exceptions, i);
+ }
highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
- return new ValidCompactorTxnList(exceptions, -1, highWater);
+ return new ValidCompactorTxnList(exceptions, highWater);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
index c249854..79ccc6b 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
@@ -26,66 +26,74 @@ public class TestValidCompactorTxnList {
@Test
public void minTxnHigh() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 3, 5);
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 2);
ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
}
@Test
public void maxTxnLow() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 13, 15);
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 12);
ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
}
@Test
public void minTxnHighNoExceptions() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 5);
+ ValidTxnList txns = new ValidCompactorTxnList(new long[0], 5);
ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
}
@Test
public void maxTxnLowNoExceptions() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 15);
+ ValidTxnList txns = new ValidCompactorTxnList(new long[0], 15);
ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
}
@Test
public void exceptionsAllBelow() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3, 15);
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3);
ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
}
@Test
public void exceptionsInMidst() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 8, 15);
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 7);
ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
}
+ @Test
+ public void exceptionsAbveHighWaterMark() {
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{8, 11, 17, 29}, 15);
+ Assert.assertArrayEquals("", new long[]{8, 11}, txns.getInvalidTransactions());
+ ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+ Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
+ rsp = txns.isTxnRangeValid(12, 16);
+ Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+ }
@Test
public void writeToString() {
- ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10}, 9, 37);
- Assert.assertEquals("37:9:7:9:10", txns.writeToString());
+ ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10, Long.MAX_VALUE}, 8);
+ Assert.assertEquals("8:" + Long.MAX_VALUE + ":7", txns.writeToString());
txns = new ValidCompactorTxnList();
- Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":-1:", txns.writeToString());
- txns = new ValidCompactorTxnList(new long[0], -1, 23);
- Assert.assertEquals("23:-1:", txns.writeToString());
+ Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":" + Long.MAX_VALUE + ":", txns.writeToString());
+ txns = new ValidCompactorTxnList(new long[0], 23);
+ Assert.assertEquals("23:" + Long.MAX_VALUE + ":", txns.writeToString());
}
@Test
public void readFromString() {
- ValidCompactorTxnList txns = new ValidCompactorTxnList("37:9:7:9:10");
+ ValidCompactorTxnList txns = new ValidCompactorTxnList("37:" + Long.MAX_VALUE + ":7:9:10");
Assert.assertEquals(37L, txns.getHighWatermark());
- Assert.assertEquals(9L, txns.getMinOpenTxn());
+ Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn());
Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, txns.getInvalidTransactions());
- txns = new ValidCompactorTxnList("21:-1:");
+ txns = new ValidCompactorTxnList("21:" + Long.MAX_VALUE + ":");
Assert.assertEquals(21L, txns.getHighWatermark());
- Assert.assertEquals(-1L, txns.getMinOpenTxn());
+ Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn());
Assert.assertEquals(0, txns.getInvalidTransactions().length);
-
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index c150ec5..449d889 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -634,7 +634,7 @@ public class AcidUtils {
//By definition there are no open txns with id < 1.
return true;
}
- return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId);
+ return txnList.isValidBase(baseTxnId);
}
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0a2c3fa..20e5465 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -604,9 +604,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
footerCache = useExternalCache ? metaCache : localCache;
}
}
- String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
- Long.MAX_VALUE + ":");
- transactionList = new ValidReadTxnList(value);
+ String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
}
@VisibleForTesting
@@ -1806,9 +1805,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
bucket = (int) split.getStart();
reader = null;
}
- String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
- Long.MAX_VALUE + ":");
- ValidTxnList validTxnList = new ValidReadTxnList(txnString);
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
+ new ValidReadTxnList(txnString);
final OrcRawRecordMerger records =
new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
validTxnList, readOptions, deltas);
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e796250..af192fb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1206,6 +1206,22 @@ public class TestTxnCommands2 {
}
/**
+ * make sure Aborted txns don't red-flag a base_xxxx (HIVE-14350)
+ */
+ @Test
+ public void testNoHistory() throws Exception {
+ int[][] tableData = {{1,2},{3,4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+
+ runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+ runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
+ }
+ /**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index b83cea4..556df18 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -121,7 +121,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
AcidUtils.Directory dir =
AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf,
- new ValidReadTxnList("100:"));
+ new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
assertEquals(null, dir.getBaseDirectory());
assertEquals(0, dir.getCurrentDirectories().size());
assertEquals(0, dir.getObsolete().size());
@@ -152,7 +152,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0]));
AcidUtils.Directory dir =
AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
+ "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
assertEquals(null, dir.getBaseDirectory());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -194,7 +194,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0]));
AcidUtils.Directory dir =
AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
+ "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(5, obsolete.size());
@@ -225,7 +225,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":"));
assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
assertEquals(1, dir.getCurrentDirectories().size());
assertEquals("mock:/tbl/part1/delta_120_130",
@@ -238,7 +238,7 @@ public class TestAcidUtils {
assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString());
assertEquals(0, dir.getOriginalFiles().size());
- dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:" + Long.MAX_VALUE + ":"));
assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
assertEquals(0, dir.getCurrentDirectories().size());
obsoletes = dir.getObsolete();
@@ -250,8 +250,9 @@ public class TestAcidUtils {
the existence of delta_120_130 implies that 121 in the exception list is aborted unless
delta_120_130 is from streaming ingest in which case 121 can be open
(and thus 122-130 are open too)
+ 99 here would be Aborted since 121 is minOpenTxn, base_100 is still good
For multi-statment txns, see HIVE-13369*/
- dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121:99:121"));
assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
assertEquals(1, dir.getCurrentDirectories().size());
assertEquals("mock:/tbl/part1/delta_120_130",
@@ -265,7 +266,7 @@ public class TestAcidUtils {
boolean gotException = false;
try {
- dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5:5"));
}
catch(IOException e) {
gotException = true;
@@ -282,7 +283,7 @@ public class TestAcidUtils {
part = new MockPath(fs, "/tbl/part1");
try {
gotException = false;
- dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7"));
}
catch(IOException e) {
gotException = true;
@@ -298,7 +299,7 @@ public class TestAcidUtils {
part = new MockPath(fs, "/tbl/part1");
try {
gotException = false;
- dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7"));
}
catch(IOException e) {
gotException = true;
@@ -314,7 +315,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
part = new MockPath(fs, "/tbl/part1");
//note that we don't include current txn of the client in exception list to read-you-writes
- dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:"));
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:" + Long.MAX_VALUE + ":"));
assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString());
assertEquals(1, dir.getCurrentDirectories().size());
assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString());
@@ -330,7 +331,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]));
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":"));
// Obsolete list should include the two original bucket files, and the old base dir
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(3, obsolete.size());
@@ -351,7 +352,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -386,7 +387,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<FileStatus> obsolete = dir.getObsolete();
assertEquals(5, obsolete.size());
@@ -411,7 +412,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
- AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4"));
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -432,7 +433,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
- AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4"));
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -447,7 +448,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+ AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + Long.MAX_VALUE));
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(1, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -464,7 +465,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+ AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("3:" + Long.MAX_VALUE));
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(1, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index ddef4a2..f07aa49 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -311,7 +311,7 @@ public class TestOrcRawRecordMerger {
}
private static ValidTxnList createMaximalTxnList() {
- return new ValidReadTxnList(Long.MAX_VALUE + ":");
+ return new ValidReadTxnList();
}
@Test
@@ -517,7 +517,7 @@ public class TestOrcRawRecordMerger {
.maximumTransactionId(100).finalDestination(root);
of.getRecordUpdater(root, options).close(false);
- ValidTxnList txnList = new ValidReadTxnList("200:");
+ ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -585,7 +585,7 @@ public class TestOrcRawRecordMerger {
ru.delete(200, new MyRow("", 8, 0, BUCKET));
ru.close(false);
- ValidTxnList txnList = new ValidReadTxnList("200:");
+ ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
@@ -775,7 +775,7 @@ public class TestOrcRawRecordMerger {
merger.close();
// try ignoring the 200 transaction and make sure it works still
- ValidTxnList txns = new ValidReadTxnList("2000:200");
+ ValidTxnList txns = new ValidReadTxnList("2000:200:200");
merger =
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
txns, new Reader.Options(),