You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/07/18 21:44:30 UTC
hive git commit: HIVE-13369 AcidUtils.getAcidState() is not paying
attention toValidTxnList when choosing the best base file (Eugene Koifman,
reviewed by Owen O'Malley)
Repository: hive
Updated Branches:
refs/heads/master dfa1a5e1c -> 0896025e2
HIVE-13369 AcidUtils.getAcidState() is not paying attention toValidTxnList when choosing the best base file (Eugene Koifman, reviewed by Owen O'Malley)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0896025e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0896025e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0896025e
Branch: refs/heads/master
Commit: 0896025e24869b6362c430a804c784b3a5a51e63
Parents: dfa1a5e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 18 14:41:19 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 18 14:41:19 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 60 +++++++++--
.../hive/ql/txn/AcidOpenTxnsCounterService.java | 8 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 100 +++++++++++++++++--
4 files changed, 154 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 52dadb7..6ed5b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -449,6 +449,8 @@ public enum ErrorMsg {
IMPORT_SEMANTIC_ERROR(10324, "Import Semantic Analyzer Error"),
INVALID_FK_SYNTAX(10325, "Invalid Foreign Key syntax"),
INVALID_PK_SYNTAX(10326, "Invalid Primary Key syntax"),
+ ACID_NOT_ENOUGH_HISTORY(10327, "Not enough history available for ({0},{1}). " +
+ "Oldest available base: {2}", true),
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 36f38f6..c150ec5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -475,7 +476,9 @@ public class AcidUtils {
/** State class for getChildState; cannot modify 2 things in a method. */
private static class TxnBase {
private FileStatus status;
- private long txn;
+ private long txn = 0;
+ private long oldestBaseTxnId = Long.MAX_VALUE;
+ private Path oldestBase = null;
}
/**
@@ -571,6 +574,21 @@ public class AcidUtils {
}
}
+ if(bestBase.oldestBase != null && bestBase.status == null) {
+ /**
+ * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
+ * {@link txnList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus
+ * cannot have any data for an open txn. We could check {@link deltas} has files to cover
+ * [1,n] w/o gaps but this would almost never happen...*/
+ //todo: this should only care about 'open' tnxs (HIVE-14211)
+ long[] exceptions = txnList.getInvalidTransactions();
+ String minOpenTxn = exceptions != null && exceptions.length > 0 ?
+ Long.toString(exceptions[0]) : "x";
+ throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format(
+ Long.toString(txnList.getHighWatermark()),
+ minOpenTxn, bestBase.oldestBase.toString()));
+ }
+
final Path base = bestBase.status == null ? null : bestBase.status.getPath();
LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
deltas.size());
@@ -598,7 +616,26 @@ public class AcidUtils {
}
};
}
-
+ /**
+ * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view)
+ * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct
+ * snapshot for this reader.
+ * Note that such base is NOT obsolete. Obsolete files are those that are "covered" by other
+ * files within the snapshot.
+ */
+ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) {
+ /*This implementation is suboptimal. It considers open/aborted txns invalid while we are only
+ * concerned with 'open' ones. (Compaction removes any data that belongs to aborted txns and
+ * reads skip anything that belongs to aborted txn, thus base_7 is still OK if the only exception
+ * is txn 5 which is aborted). So this implementation can generate false positives. (HIVE-14211)
+ * */
+ if(baseTxnId == Long.MIN_VALUE) {
+ //such base is created by 1st compaction in case of non-acid to acid table conversion
+ //By definition there are no open txns with id < 1.
+ return true;
+ }
+ return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId);
+ }
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) {
@@ -606,13 +643,22 @@ public class AcidUtils {
String fn = p.getName();
if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
long txn = parseBase(p);
+ if(bestBase.oldestBaseTxnId > txn) {
+ //keep track for error reporting
+ bestBase.oldestBase = p;
+ bestBase.oldestBaseTxnId = txn;
+ }
if (bestBase.status == null) {
- bestBase.status = child;
- bestBase.txn = txn;
+ if(isValidBase(txn, txnList)) {
+ bestBase.status = child;
+ bestBase.txn = txn;
+ }
} else if (bestBase.txn < txn) {
- obsolete.add(bestBase.status);
- bestBase.status = child;
- bestBase.txn = txn;
+ if(isValidBase(txn, txnList)) {
+ obsolete.add(bestBase.status);
+ bestBase.status = child;
+ bestBase.txn = txn;
+ }
} else {
obsolete.add(child);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
index f5eb8a1..08fcff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
@@ -47,6 +47,7 @@ public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase {
return "Count number of open transactions";
}
private static final class OpenTxnsCounter implements Runnable {
+ private static volatile long lastLogTime = 0;
private final TxnStore txnHandler;
private final AtomicInteger isAliveCounter;
private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) {
@@ -59,7 +60,12 @@ public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase {
long startTime = System.currentTimeMillis();
txnHandler.countOpenTxns();
int count = isAliveCounter.incrementAndGet();
- LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
+ if(System.currentTimeMillis() - lastLogTime > 60*1000) {
+ //don't flood the logs with too many msgs
+ LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime) / 1000 +
+ "seconds. isAliveCounter=" + count);
+ lastLogTime = System.currentTimeMillis();
+ }
}
catch(Throwable t) {
LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/hive/blob/0896025e/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 5745dee..b83cea4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -217,25 +219,107 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_98_100/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_120_130/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "/tbl/part1");
AcidUtils.Directory dir =
AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
- assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString());
+ assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
+ assertEquals(1, dir.getCurrentDirectories().size());
+ assertEquals("mock:/tbl/part1/delta_120_130",
+ dir.getCurrentDirectories().get(0).getPath().toString());
List<FileStatus> obsoletes = dir.getObsolete();
assertEquals(4, obsoletes.size());
assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString());
- assertEquals("mock:/tbl/part1/base_100", obsoletes.get(1).getPath().toString());
- assertEquals("mock:/tbl/part1/base_25", obsoletes.get(2).getPath().toString());
- assertEquals("mock:/tbl/part1/base_5", obsoletes.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString());
assertEquals(0, dir.getOriginalFiles().size());
- assertEquals(0, dir.getCurrentDirectories().size());
- // we should always get the latest base
+
dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
- assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString());
- }
+ assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
+ assertEquals(0, dir.getCurrentDirectories().size());
+ obsoletes = dir.getObsolete();
+ assertEquals(1, obsoletes.size());
+ assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).getPath().toString());
+ assertEquals(0, dir.getOriginalFiles().size());
+ /*Single statemnt txns only: since we don't compact a txn range that includes an open txn,
+ the existence of delta_120_130 implies that 121 in the exception list is aborted unless
+ delta_120_130 is from streaming ingest in which case 121 can be open
+ (and thus 122-130 are open too)
+ For multi-statment txns, see HIVE-13369*/
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121"));
+ assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
+ assertEquals(1, dir.getCurrentDirectories().size());
+ assertEquals("mock:/tbl/part1/delta_120_130",
+ dir.getCurrentDirectories().get(0).getPath().toString());
+ obsoletes = dir.getObsolete();
+ assertEquals(4, obsoletes.size());
+ assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString());
+
+ boolean gotException = false;
+ try {
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5"));
+ }
+ catch(IOException e) {
+ gotException = true;
+ Assert.assertEquals("Not enough history available for (125,5). Oldest available base: " +
+ "mock:/tbl/part1/base_5", e.getMessage());
+ }
+ Assert.assertTrue("Expected exception", gotException);
+
+ fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_10/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_12_25/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+ part = new MockPath(fs, "/tbl/part1");
+ try {
+ gotException = false;
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+ }
+ catch(IOException e) {
+ gotException = true;
+ Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " +
+ "mock:/tbl/part1/base_25", e.getMessage());
+ }
+ Assert.assertTrue("Expected exception", gotException);
+
+ fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_2_10/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+ part = new MockPath(fs, "/tbl/part1");
+ try {
+ gotException = false;
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+ }
+ catch(IOException e) {
+ gotException = true;
+ Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " +
+ "mock:/tbl/part1/base_25", e.getMessage());
+ }
+ Assert.assertTrue("Expected exception", gotException);
+
+ fs = new MockFileSystem(conf,
+ //non-acid to acid table conversion
+ new MockFile("mock:/tbl/part1/base_" + Long.MIN_VALUE + "/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+ part = new MockPath(fs, "/tbl/part1");
+ //note that we don't include current txn of the client in exception list to read-you-writes
+ dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:"));
+ assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString());
+ assertEquals(1, dir.getCurrentDirectories().size());
+ assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString());
+ assertEquals(0, dir.getObsolete().size());
+ }
@Test
public void testObsoleteOriginals() throws Exception {
Configuration conf = new Configuration();