You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2021/08/31 08:42:46 UTC

[hive] branch master updated: HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b30cd8  HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita)
5b30cd8 is described below

commit 5b30cd879b8ef5a4aecbfcaf4366cb8608222909
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Aug 31 10:42:36 2021 +0200

    HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita)
---
 .../mr/hive/TestHiveIcebergSchemaEvolution.java    |  7 +++---
 .../org/apache/iceberg/mr/hive/TestHiveShell.java  |  2 +-
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  | 16 +++++++++---
 .../org/apache/hadoop/hive/ql/metadata/Table.java  | 22 ++++++++++------
 .../optimizer/calcite/translator/ASTBuilder.java   | 13 ++++++++++
 .../hadoop/hive/ql/parse/CalcitePlanner.java       |  3 ++-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 11 ++------
 .../apache/hadoop/hive/ql/plan/TableScanDesc.java  | 29 +++++-----------------
 8 files changed, 55 insertions(+), 48 deletions(-)

diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java
index 96d7850..9b3c941 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java
@@ -127,7 +127,7 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit
     shell.executeStatement("ALTER TABLE orders CHANGE COLUMN " +
         "item fruit string");
     List<Object[]> result = shell.executeStatement("SELECT customer_first_name, customer_last_name, SUM(quantity) " +
-        "FROM orders where price >= 3 group by customer_first_name, customer_last_name");
+        "FROM orders where price >= 3 group by customer_first_name, customer_last_name order by customer_first_name");
 
     assertQueryResult(result, 4,
         "Doctor", "Strange", 900L,
@@ -140,7 +140,8 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit
     shell.executeStatement("ALTER TABLE orders ADD COLUMNS (nickname string)");
     shell.executeStatement("INSERT INTO orders VALUES (7, 'Romanoff', 'Natasha', 3, 250, 'apple', 'Black Widow')");
     result = shell.executeStatement("SELECT customer_first_name, customer_last_name, nickname, SUM(quantity) " +
-        " FROM orders where price >= 3 group by customer_first_name, customer_last_name, nickname");
+        " FROM orders where price >= 3 group by customer_first_name, customer_last_name, nickname " +
+        " order by customer_first_name");
     assertQueryResult(result, 5,
         "Doctor", "Strange", null, 900L,
         "Natasha", "Romanoff", "Black Widow", 250L,
@@ -152,7 +153,7 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit
     shell.executeStatement("ALTER TABLE orders CHANGE COLUMN fruit fruit string AFTER nickname");
     result = shell.executeStatement("SELECT customer_first_name, customer_last_name, nickname, fruit, SUM(quantity) " +
         " FROM orders where price >= 3 and fruit < 'o' group by customer_first_name, customer_last_name, nickname, " +
-        "fruit");
+        "fruit order by customer_first_name");
     assertQueryResult(result, 4,
         "Doctor", "Strange", null, "apple", 100L,
         "Natasha", "Romanoff", "Black Widow", "apple", 250L,
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index b3c9440..3d39889 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -185,7 +185,7 @@ public class TestHiveShell {
     hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT, -1);
 
     // Switch off optimizers in order to contain the map reduction within this JVM
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, true);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT, false);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES, false);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index f48f65d..372f9af 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.type.TimestampTZ;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -75,6 +78,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -936,12 +940,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
 
   protected static void pushAsOf(Configuration jobConf, TableScanOperator ts) {
     TableScanDesc scanDesc = ts.getConf();
-    if (scanDesc.getAsOfTimestamp() != -1) {
-      jobConf.set(TableScanDesc.AS_OF_TIMESTAMP, Long.toString(scanDesc.getAsOfTimestamp()));
+    if (scanDesc.getAsOfTimestamp() != null) {
+      ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() :
+          SessionState.get().getConf().getLocalTimeZone();
+      TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(scanDesc.getAsOfTimestamp()), timeZone);
+
+      jobConf.set(TableScanDesc.AS_OF_TIMESTAMP, Long.toString(time.toEpochMilli()));
     }
 
-    if (scanDesc.getAsOfVersion() != -1) {
-      jobConf.set(TableScanDesc.AS_OF_VERSION, Long.toString(scanDesc.getAsOfVersion()));
+    if (scanDesc.getAsOfVersion() != null) {
+      jobConf.set(TableScanDesc.AS_OF_VERSION, scanDesc.getAsOfVersion());
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 2ad1d17..c7549c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
@@ -128,12 +129,13 @@ public class Table implements Serializable {
   /**
    * The version of the table. For Iceberg tables this is the snapshotId.
    */
-  private long asOfVersion = -1;
+  private String asOfVersion = null;
 
   /**
-   * The version of the table at the given timestamp. This is the epoch millisecond.
+   * The version of the table at the given timestamp. The format will be parsed with
+   * TimestampTZUtil.parse.
    */
-  private long asOfTimestamp = -1;
+  private String asOfTimestamp = null;
 
   /**
    * Used only for serialization.
@@ -567,6 +569,12 @@ public class Table implements Serializable {
     } else if (!tTable.equals(other.tTable)) {
       return false;
     }
+    if (!Objects.equals(asOfTimestamp, other.asOfTimestamp)) {
+      return false;
+    }
+    if (!Objects.equals(asOfVersion, other.asOfVersion)) {
+      return false;
+    }
     return true;
   }
 
@@ -1282,19 +1290,19 @@ public class Table implements Serializable {
     return result;
   }
 
-  public long getAsOfVersion() {
+  public String getAsOfVersion() {
     return asOfVersion;
   }
 
-  public void setAsOfVersion(long asOfVersion) {
+  public void setAsOfVersion(String asOfVersion) {
     this.asOfVersion = asOfVersion;
   }
 
-  public long getAsOfTimestamp() {
+  public String getAsOfTimestamp() {
     return asOfTimestamp;
   }
 
-  public void setAsOfTimestamp(long asOfTimestamp) {
+  public void setAsOfTimestamp(String asOfTimestamp) {
     this.asOfTimestamp = asOfTimestamp;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index 9fbdedb..264da23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -31,6 +31,7 @@ import org.apache.calcite.util.TimeString;
 import org.apache.calcite.util.TimestampString;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
@@ -86,6 +87,18 @@ public class ASTBuilder {
 
     ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add(tableNameBuilder);
 
+    if (hTbl.getHiveTableMD().getAsOfTimestamp() != null) {
+      ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_AS_OF_TIME, "TOK_AS_OF_TIME")
+          .add(HiveParser.StringLiteral, hTbl.getHiveTableMD().getAsOfTimestamp());
+      b.add(asOfBuilder);
+    }
+
+    if (hTbl.getHiveTableMD().getAsOfVersion() != null) {
+      ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_AS_OF_VERSION, "TOK_AS_OF_VERSION")
+          .add(HiveParser.Number, hTbl.getHiveTableMD().getAsOfVersion());
+      b.add(asOfBuilder);
+    }
+
     ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST");
     if (scan instanceof DruidQuery) {
       //Passing query spec, column names and column types to be used as part of Hive Physical execution
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 5a034ae..99c8045 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -530,7 +530,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
       List<ASTNode> oldHints = new ArrayList<>();
       // Cache the hints before CBO runs and removes them.
       // Use the hints later in top level QB.
-        getHintsFromQB(getQB(), oldHints);
+      getHintsFromQB(getQB(), oldHints);
 
       // Note: for now, we don't actually pass the queryForCbo to CBO, because
       // it accepts qb, not AST, and can also access all the private stuff in
@@ -612,6 +612,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
             if (!doPhase1(newAST, getQB(), ctx_1, null)) {
               throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan");
             }
+
             // unfortunately making prunedPartitions immutable is not possible
             // here with SemiJoins not all tables are costed in CBO, so their
             // PartitionList is not evaluated until the run phase.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 1b53e51..385a54a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -2246,15 +2246,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         if (!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false)) {
           throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias);
         }
-        if (asOf.getLeft() != null) {
-          tab.setAsOfVersion(Long.parseLong(asOf.getLeft()));
-        }
-        if (asOf.getRight() != null) {
-          ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() :
-                                 SessionState.get().getConf().getLocalTimeZone();
-          TimestampTZ ts = TimestampTZUtil.parse(stripQuotes(asOf.getRight()), timeZone);
-          tab.setAsOfTimestamp(ts.toEpochMilli());
-        }
+        tab.setAsOfVersion(asOf.getLeft());
+        tab.setAsOfTimestamp(asOf.getRight());
       }
 
       if (tab.isView()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index a91cf56..ced417b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -141,9 +141,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
 
   private int numBuckets = -1;
 
-  private long asOfVersion = -1;
+  private String asOfVersion = null;
 
-  private long asOfTimestamp = -1;
+  private String asOfTimestamp = null;
 
   public TableScanDesc() {
     this(null, null);
@@ -536,30 +536,13 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
             : storageHandler.getOperatorDescProperties(this, opProps);
   }
 
-  @Explain(displayName = "As of version", explainLevels = { Level.EXTENDED })
-  public String getAsOfVersionText() {
-    if (asOfVersion != -1) {
-      return String.valueOf(asOfVersion);
-    } else {
-      return null;
-    }
-  }
-
-  public long getAsOfVersion() {
+  @Explain(displayName = "As of version")
+  public String getAsOfVersion() {
     return asOfVersion;
   }
 
-  @Explain(displayName = "As of timestamp", explainLevels = { Level.EXTENDED })
-  public String getAsOfTimestampText() {
-    if (asOfTimestamp != -1) {
-      DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
-      return format.format(new Date(asOfTimestamp));
-    } else {
-      return null;
-    }
-  }
-
-  public long getAsOfTimestamp() {
+  @Explain(displayName = "As of timestamp")
+  public String getAsOfTimestamp() {
     return asOfTimestamp;
   }