You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/07/08 21:24:50 UTC
hive git commit: HIVE-13392 disable speculative execution for ACID
Compactor (Eugene Koifman, reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/branch-2.1 e1cec964d -> 39ecc205e
HIVE-13392 disable speculative execution for ACID Compactor (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39ecc205
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39ecc205
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39ecc205
Branch: refs/heads/branch-2.1
Commit: 39ecc205e64cd1808bebec3ae1dc448e01c48680
Parents: e1cec96
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Jul 8 13:17:29 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Jul 8 14:24:28 2016 -0700
----------------------------------------------------------------------
.../hive/common/ValidCompactorTxnList.java | 111 +++++++++++++++++++
.../hive/metastore/txn/CompactionInfo.java | 1 +
.../hadoop/hive/metastore/txn/TxnUtils.java | 1 +
.../metastore/txn/ValidCompactorTxnList.java | 111 -------------------
.../txn/TestValidCompactorTxnList.java | 1 +
.../hive/ql/txn/compactor/CompactorMR.java | 8 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 2 +-
7 files changed, 121 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
new file mode 100644
index 0000000..ad79e2c
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+
+import java.util.Arrays;
+
+/**
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
+ * is committed or aborted. Additionally it will return none if there are any open transactions
+ * below the max transaction given, since we don't want to compact above open transactions. For
+ * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These
+ * produce the logic we need to assure that the compactor only sees records less than the lowest
+ * open transaction when choosing which files to compact, but that it still ignores aborted
+ * records when compacting.
+ */
+public class ValidCompactorTxnList extends ValidReadTxnList {
+ //TODO: refactor this - minOpenTxn is not needed if we set
+ // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns)
+
+ // The minimum open transaction id
+ private long minOpenTxn;
+
+ public ValidCompactorTxnList() {
+ super();
+ minOpenTxn = -1;
+ }
+
+ /**
+ *
+ * @param exceptions list of all open and aborted transactions
+ * @param minOpen lowest open transaction
+ * @param highWatermark highest committed transaction
+ */
+ public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) {
+ super(exceptions, highWatermark);
+ minOpenTxn = minOpen;
+ }
+
+ public ValidCompactorTxnList(String value) {
+ super(value);
+ }
+
+ @Override
+ public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+ if (highWatermark < minTxnId) {
+ return RangeResponse.NONE;
+ } else if (minOpenTxn < 0) {
+ return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
+ } else {
+ return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
+ }
+ }
+
+ @Override
+ public String writeToString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(highWatermark);
+ buf.append(':');
+ buf.append(minOpenTxn);
+ if (exceptions.length == 0) {
+ buf.append(':');
+ } else {
+ for(long except: exceptions) {
+ buf.append(':');
+ buf.append(except);
+ }
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void readFromString(String src) {
+ if (src == null || src.length() == 0) {
+ highWatermark = Long.MAX_VALUE;
+ exceptions = new long[0];
+ } else {
+ String[] values = src.split(":");
+ highWatermark = Long.parseLong(values[0]);
+ minOpenTxn = Long.parseLong(values[1]);
+ exceptions = new long[values.length - 2];
+ for(int i = 2; i < values.length; ++i) {
+ exceptions[i-2] = Long.parseLong(values[i]);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public long getMinOpenTxn() {
+ return minOpenTxn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 85e0885..413ce3b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.txn;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 46348ea..39b18ac 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.txn;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
deleted file mode 100644
index 30bdfa7..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore.txn;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-
-import java.util.Arrays;
-
-/**
- * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
- * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
- * is committed or aborted. Additionally it will return none if there are any open transactions
- * below the max transaction given, since we don't want to compact above open transactions. For
- * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These
- * produce the logic we need to assure that the compactor only sees records less than the lowest
- * open transaction when choosing which files to compact, but that it still ignores aborted
- * records when compacting.
- */
-public class ValidCompactorTxnList extends ValidReadTxnList {
- //TODO: refactor this - minOpenTxn is not needed if we set
- // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns)
-
- // The minimum open transaction id
- private long minOpenTxn;
-
- public ValidCompactorTxnList() {
- super();
- minOpenTxn = -1;
- }
-
- /**
- *
- * @param exceptions list of all open and aborted transactions
- * @param minOpen lowest open transaction
- * @param highWatermark highest committed transaction
- */
- public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) {
- super(exceptions, highWatermark);
- minOpenTxn = minOpen;
- }
-
- public ValidCompactorTxnList(String value) {
- super(value);
- }
-
- @Override
- public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
- if (highWatermark < minTxnId) {
- return RangeResponse.NONE;
- } else if (minOpenTxn < 0) {
- return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
- } else {
- return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
- }
- }
-
- @Override
- public String writeToString() {
- StringBuilder buf = new StringBuilder();
- buf.append(highWatermark);
- buf.append(':');
- buf.append(minOpenTxn);
- if (exceptions.length == 0) {
- buf.append(':');
- } else {
- for(long except: exceptions) {
- buf.append(':');
- buf.append(except);
- }
- }
- return buf.toString();
- }
-
- @Override
- public void readFromString(String src) {
- if (src == null || src.length() == 0) {
- highWatermark = Long.MAX_VALUE;
- exceptions = new long[0];
- } else {
- String[] values = src.split(":");
- highWatermark = Long.parseLong(values[0]);
- minOpenTxn = Long.parseLong(values[1]);
- exceptions = new long[values.length - 2];
- for(int i = 2; i < values.length; ++i) {
- exceptions[i-2] = Long.parseLong(values[i]);
- }
- }
- }
-
- @VisibleForTesting
- long getMinOpenTxn() {
- return minOpenTxn;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
index c0923eb..c249854 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.txn;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index b54a95d..6caca98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,7 +28,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -133,6 +133,10 @@ public class CompactorMR {
overrideTblProps(job, t.getParameters(), ci.properties);
}
setColumnTypes(job, sd.getCols());
+ //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were
+ //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter
+ //to do the final move
+ job.setBoolean("mapreduce.map.speculative", false);
return job;
}
@@ -623,7 +627,7 @@ public class CompactorMR {
AcidInputFormat<WritableComparable, V> aif =
instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
ValidTxnList txnList =
- new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+ new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
AcidInputFormat.RawReader<V> reader =
http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 83a2ba3..5745dee 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;