You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2021/08/31 08:42:46 UTC
[hive] branch master updated: HIVE-25480: Fix Time Travel with CBO
(#2602) (Peter Vary reviewed by Adam Szita)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5b30cd8 HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita)
5b30cd8 is described below
commit 5b30cd879b8ef5a4aecbfcaf4366cb8608222909
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Aug 31 10:42:36 2021 +0200
HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita)
---
.../mr/hive/TestHiveIcebergSchemaEvolution.java | 7 +++---
.../org/apache/iceberg/mr/hive/TestHiveShell.java | 2 +-
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 16 +++++++++---
.../org/apache/hadoop/hive/ql/metadata/Table.java | 22 ++++++++++------
.../optimizer/calcite/translator/ASTBuilder.java | 13 ++++++++++
.../hadoop/hive/ql/parse/CalcitePlanner.java | 3 ++-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 11 ++------
.../apache/hadoop/hive/ql/plan/TableScanDesc.java | 29 +++++-----------------
8 files changed, 55 insertions(+), 48 deletions(-)
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java
index 96d7850..9b3c941 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java
@@ -127,7 +127,7 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit
shell.executeStatement("ALTER TABLE orders CHANGE COLUMN " +
"item fruit string");
List<Object[]> result = shell.executeStatement("SELECT customer_first_name, customer_last_name, SUM(quantity) " +
- "FROM orders where price >= 3 group by customer_first_name, customer_last_name");
+ "FROM orders where price >= 3 group by customer_first_name, customer_last_name order by customer_first_name");
assertQueryResult(result, 4,
"Doctor", "Strange", 900L,
@@ -140,7 +140,8 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit
shell.executeStatement("ALTER TABLE orders ADD COLUMNS (nickname string)");
shell.executeStatement("INSERT INTO orders VALUES (7, 'Romanoff', 'Natasha', 3, 250, 'apple', 'Black Widow')");
result = shell.executeStatement("SELECT customer_first_name, customer_last_name, nickname, SUM(quantity) " +
- " FROM orders where price >= 3 group by customer_first_name, customer_last_name, nickname");
+ " FROM orders where price >= 3 group by customer_first_name, customer_last_name, nickname " +
+ " order by customer_first_name");
assertQueryResult(result, 5,
"Doctor", "Strange", null, 900L,
"Natasha", "Romanoff", "Black Widow", 250L,
@@ -152,7 +153,7 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit
shell.executeStatement("ALTER TABLE orders CHANGE COLUMN fruit fruit string AFTER nickname");
result = shell.executeStatement("SELECT customer_first_name, customer_last_name, nickname, fruit, SUM(quantity) " +
" FROM orders where price >= 3 and fruit < 'o' group by customer_first_name, customer_last_name, nickname, " +
- "fruit");
+ "fruit order by customer_first_name");
assertQueryResult(result, 4,
"Doctor", "Strange", null, "apple", 100L,
"Natasha", "Romanoff", "Black Widow", "apple", 250L,
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index b3c9440..3d39889 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -185,7 +185,7 @@ public class TestHiveShell {
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT, -1);
// Switch off optimizers in order to contain the map reduction within this JVM
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT, false);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES, false);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index f48f65d..372f9af 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.type.TimestampTZ;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -75,6 +78,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -936,12 +940,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
protected static void pushAsOf(Configuration jobConf, TableScanOperator ts) {
TableScanDesc scanDesc = ts.getConf();
- if (scanDesc.getAsOfTimestamp() != -1) {
- jobConf.set(TableScanDesc.AS_OF_TIMESTAMP, Long.toString(scanDesc.getAsOfTimestamp()));
+ if (scanDesc.getAsOfTimestamp() != null) {
+ ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() :
+ SessionState.get().getConf().getLocalTimeZone();
+ TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(scanDesc.getAsOfTimestamp()), timeZone);
+
+ jobConf.set(TableScanDesc.AS_OF_TIMESTAMP, Long.toString(time.toEpochMilli()));
}
- if (scanDesc.getAsOfVersion() != -1) {
- jobConf.set(TableScanDesc.AS_OF_VERSION, Long.toString(scanDesc.getAsOfVersion()));
+ if (scanDesc.getAsOfVersion() != null) {
+ jobConf.set(TableScanDesc.AS_OF_VERSION, scanDesc.getAsOfVersion());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 2ad1d17..c7549c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
@@ -128,12 +129,13 @@ public class Table implements Serializable {
/**
* The version of the table. For Iceberg tables this is the snapshotId.
*/
- private long asOfVersion = -1;
+ private String asOfVersion = null;
/**
- * The version of the table at the given timestamp. This is the epoch millisecond.
+ * The version of the table at the given timestamp. The format will be parsed with
+ * TimestampTZUtil.parse.
*/
- private long asOfTimestamp = -1;
+ private String asOfTimestamp = null;
/**
* Used only for serialization.
@@ -567,6 +569,12 @@ public class Table implements Serializable {
} else if (!tTable.equals(other.tTable)) {
return false;
}
+ if (!Objects.equals(asOfTimestamp, other.asOfTimestamp)) {
+ return false;
+ }
+ if (!Objects.equals(asOfVersion, other.asOfVersion)) {
+ return false;
+ }
return true;
}
@@ -1282,19 +1290,19 @@ public class Table implements Serializable {
return result;
}
- public long getAsOfVersion() {
+ public String getAsOfVersion() {
return asOfVersion;
}
- public void setAsOfVersion(long asOfVersion) {
+ public void setAsOfVersion(String asOfVersion) {
this.asOfVersion = asOfVersion;
}
- public long getAsOfTimestamp() {
+ public String getAsOfTimestamp() {
return asOfTimestamp;
}
- public void setAsOfTimestamp(long asOfTimestamp) {
+ public void setAsOfTimestamp(String asOfTimestamp) {
this.asOfTimestamp = asOfTimestamp;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index 9fbdedb..264da23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -31,6 +31,7 @@ import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
@@ -86,6 +87,18 @@ public class ASTBuilder {
ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add(tableNameBuilder);
+ if (hTbl.getHiveTableMD().getAsOfTimestamp() != null) {
+ ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_AS_OF_TIME, "TOK_AS_OF_TIME")
+ .add(HiveParser.StringLiteral, hTbl.getHiveTableMD().getAsOfTimestamp());
+ b.add(asOfBuilder);
+ }
+
+ if (hTbl.getHiveTableMD().getAsOfVersion() != null) {
+ ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_AS_OF_VERSION, "TOK_AS_OF_VERSION")
+ .add(HiveParser.Number, hTbl.getHiveTableMD().getAsOfVersion());
+ b.add(asOfBuilder);
+ }
+
ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST");
if (scan instanceof DruidQuery) {
//Passing query spec, column names and column types to be used as part of Hive Physical execution
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 5a034ae..99c8045 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -530,7 +530,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
List<ASTNode> oldHints = new ArrayList<>();
// Cache the hints before CBO runs and removes them.
// Use the hints later in top level QB.
- getHintsFromQB(getQB(), oldHints);
+ getHintsFromQB(getQB(), oldHints);
// Note: for now, we don't actually pass the queryForCbo to CBO, because
// it accepts qb, not AST, and can also access all the private stuff in
@@ -612,6 +612,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
if (!doPhase1(newAST, getQB(), ctx_1, null)) {
throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan");
}
+
// unfortunately making prunedPartitions immutable is not possible
// here with SemiJoins not all tables are costed in CBO, so their
// PartitionList is not evaluated until the run phase.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 1b53e51..385a54a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -2246,15 +2246,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false)) {
throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias);
}
- if (asOf.getLeft() != null) {
- tab.setAsOfVersion(Long.parseLong(asOf.getLeft()));
- }
- if (asOf.getRight() != null) {
- ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() :
- SessionState.get().getConf().getLocalTimeZone();
- TimestampTZ ts = TimestampTZUtil.parse(stripQuotes(asOf.getRight()), timeZone);
- tab.setAsOfTimestamp(ts.toEpochMilli());
- }
+ tab.setAsOfVersion(asOf.getLeft());
+ tab.setAsOfTimestamp(asOf.getRight());
}
if (tab.isView()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index a91cf56..ced417b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -141,9 +141,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
private int numBuckets = -1;
- private long asOfVersion = -1;
+ private String asOfVersion = null;
- private long asOfTimestamp = -1;
+ private String asOfTimestamp = null;
public TableScanDesc() {
this(null, null);
@@ -536,30 +536,13 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
: storageHandler.getOperatorDescProperties(this, opProps);
}
- @Explain(displayName = "As of version", explainLevels = { Level.EXTENDED })
- public String getAsOfVersionText() {
- if (asOfVersion != -1) {
- return String.valueOf(asOfVersion);
- } else {
- return null;
- }
- }
-
- public long getAsOfVersion() {
+ @Explain(displayName = "As of version")
+ public String getAsOfVersion() {
return asOfVersion;
}
- @Explain(displayName = "As of timestamp", explainLevels = { Level.EXTENDED })
- public String getAsOfTimestampText() {
- if (asOfTimestamp != -1) {
- DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
- return format.format(new Date(asOfTimestamp));
- } else {
- return null;
- }
- }
-
- public long getAsOfTimestamp() {
+ @Explain(displayName = "As of timestamp")
+ public String getAsOfTimestamp() {
return asOfTimestamp;
}