You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2022/08/10 07:26:39 UTC

[impala] branch master updated (308fda110 -> 3ed71756c)

This is an automated email from the ASF dual-hosted git repository.

tmater pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 308fda110 IMPALA-11481: Make test_runtime_profile_aggregate slower to avoid flakyness
     new 05c3a8e09 IMPALA-11465, IMPALA-11466: Bump CDP_BUILD_NUMBER to 30010248
     new b6d6e86f1 IMPALA-11385: Upgrade JAVA thrift components to thrift-0.16.0
     new 896ce1025 IMPALA-11362: Add expire snapshots functionality for Iceberg tables
     new 3ed71756c IMPALA-11458: Update zlib and zstd

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/impala-config.sh                               |  34 ++---
 common/thrift/JniCatalog.thrift                    |  10 ++
 fe/pom.xml                                         |   7 +
 fe/src/main/cup/sql-parser.cup                     |  26 ++--
 .../impala/analysis/AlterTableExecuteStmt.java     | 120 +++++++++++++++
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |   8 +-
 .../impala/catalog/CatalogServiceCatalog.java      |   3 +-
 .../catalog/metastore/CatalogHmsClientUtils.java   |   7 +-
 .../java/org/apache/impala/common/JniUtil.java     |   6 +-
 .../apache/impala/service/CatalogOpExecutor.java   |   7 +
 .../java/org/apache/impala/service/FeSupport.java  |  14 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |  10 ++
 .../java/org/apache/impala/service/JniCatalog.java |   2 +-
 .../org/apache/impala/service/JniFrontend.java     |  42 +++---
 fe/src/main/jflex/sql-scanner.flex                 |   1 +
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  26 ++++
 .../queries/QueryTest/orc-stats.test               |  11 +-
 tests/common/iceberg_test_suite.py                 |  66 +++++++++
 tests/query_test/test_iceberg.py                   | 165 ++++++++++++---------
 19 files changed, 426 insertions(+), 139 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
 create mode 100644 tests/common/iceberg_test_suite.py


[impala] 01/04: IMPALA-11465, IMPALA-11466: Bump CDP_BUILD_NUMBER to 30010248

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmater pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 05c3a8e09ce2e0180186639508b5c77eb01e919d
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Sat Jul 30 22:02:54 2022 -0700

    IMPALA-11465, IMPALA-11466: Bump CDP_BUILD_NUMBER to 30010248
    
    This patch bump up CDP_BUILD_NUMBER to pick Hive version
    3.1.3000.7.2.16.0-127 that contains:
    - thrift-0.16.0 upgrade from HIVE-25635.
    - Backport of ORC-517.
    
    This patch also contains fix for IMPALA-11466 by adding jetty-server as
    an allowed dependency.
    
    Testing:
    - Build locally and confirm that the cdp components is downloaded.
    
    Change-Id: Iff5297a48865fb2444e8ef7b9881536dc1bbf63c
    Reviewed-on: http://gerrit.cloudera.org:8080/18803
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Riza Suminto <ri...@cloudera.com>
---
 bin/impala-config.sh                               | 24 +++++++++++-----------
 fe/pom.xml                                         |  7 +++++++
 .../queries/QueryTest/orc-stats.test               | 11 +++-------
 3 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 753b8b13d..d8bf48e65 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -198,20 +198,20 @@ fi
 : ${IMPALA_TOOLCHAIN_HOST:=native-toolchain.s3.amazonaws.com}
 export IMPALA_TOOLCHAIN_HOST
 
-export CDP_BUILD_NUMBER=27992803
+export CDP_BUILD_NUMBER=30010248
 export CDP_MAVEN_REPOSITORY=\
 "https://${IMPALA_TOOLCHAIN_HOST}/build/cdp_components/${CDP_BUILD_NUMBER}/maven"
-export CDP_AVRO_JAVA_VERSION=1.8.2.7.2.16.0-77
-export CDP_HADOOP_VERSION=3.1.1.7.2.16.0-77
-export CDP_HBASE_VERSION=2.4.6.7.2.16.0-77
-export CDP_HIVE_VERSION=3.1.3000.7.2.16.0-77
-export CDP_ICEBERG_VERSION=0.13.1.7.2.16.0-77
-export CDP_KNOX_VERSION=1.3.0.7.2.16.0-77
-export CDP_OZONE_VERSION=1.1.0.7.2.16.0-77
-export CDP_PARQUET_VERSION=1.10.99.7.2.16.0-77
-export CDP_RANGER_VERSION=2.1.0.7.2.16.0-77
-export CDP_TEZ_VERSION=0.9.1.7.2.16.0-77
-export CDP_GCS_VERSION=2.1.2.7.2.16.0-77
+export CDP_AVRO_JAVA_VERSION=1.8.2.7.2.16.0-127
+export CDP_HADOOP_VERSION=3.1.1.7.2.16.0-127
+export CDP_HBASE_VERSION=2.4.6.7.2.16.0-127
+export CDP_HIVE_VERSION=3.1.3000.7.2.16.0-127
+export CDP_ICEBERG_VERSION=0.13.1.7.2.16.0-127
+export CDP_KNOX_VERSION=1.3.0.7.2.16.0-127
+export CDP_OZONE_VERSION=1.1.0.7.2.16.0-127
+export CDP_PARQUET_VERSION=1.10.99.7.2.16.0-127
+export CDP_RANGER_VERSION=2.3.0.7.2.16.0-127
+export CDP_TEZ_VERSION=0.9.1.7.2.16.0-127
+export CDP_GCS_VERSION=2.1.2.7.2.16.0-127
 
 # Ref: https://infra.apache.org/release-download-pages.html#closer
 : ${APACHE_MIRROR:="https://www.apache.org/dyn/closer.cgi"}
diff --git a/fe/pom.xml b/fe/pom.xml
index 25496d633..8bb051c84 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -518,6 +518,11 @@ under the License.
       <version>3.2.0-m3</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>9.4.31.v20200723</version>
+    </dependency>
 
     <!-- https://mvnrepository.com/artifact/org.pac4j/pac4j-saml -->
     <dependency>
@@ -859,6 +864,8 @@ under the License.
                     <include>org.eclipse.jetty:jetty-client</include>
                     <include>org.eclipse.jetty:jetty-http</include>
                     <include>org.eclipse.jetty:jetty-io</include>
+                    <!-- jetty-server is required when HiveMetaStoreClient is instantiated after HIVE-21456. -->
+                    <include>org.eclipse.jetty:jetty-server</include>
                     <!-- hadoop-yarn-common depends on some Jetty utilities. -->
                     <include>org.eclipse.jetty:jetty-util</include>
                     <!-- Include the allowed versions specifically -->
diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
index fbe24d4d1..d43049c4d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats.test
@@ -330,14 +330,11 @@ aggregation(SUM, RowsRead): 100
 ====
 ---- QUERY
 # Test on predicate x < min_val for decimal(9,0).
-# Due to ORC-517 not included in the current Hive version (3.1.3000.7.2.12.0-104),
-# the ORC files have wrong statistics on d1 column showing that its minimum is 0.
-# So we still see RowsRead=5 here.
 select count(*) from functional_orc_def.decimal_tbl where d1 < 1234
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-aggregation(SUM, RowsRead): 5
+aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
 select count(*) from functional_orc_def.decimal_tbl where d1 < 0
@@ -734,11 +731,9 @@ aggregation(SUM, RowsRead): 13
 ---- QUERY
 # Test pushing down IN-list predicate with literals outside the value range. Explicitly cast
 # the literals so FE won't wrap 'd1' as casting to INT, so the predicate can be pushed down.
-# Due to ORC-517 not included in the current Hive version (3.1.3000.7.2.12.0-104),
-# the ORC files have wrong statistics on d1 column showing that its minimum is 0. So here we
-# use -1 to be smaller than it. The max of d1 is 132842 so we use 132843.
+# The min and max of d1 are 1234 and 132842 so we use 1233 and 132843.
 select count(*) from functional_orc_def.decimal_tbl
-where d1 in (cast(-1 as decimal(9,0)), cast(132843 as decimal(9,0)));
+where d1 in (cast(1233 as decimal(9,0)), cast(132843 as decimal(9,0)));
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE


[impala] 02/04: IMPALA-11385: Upgrade JAVA thrift components to thrift-0.16.0

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmater pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b6d6e86f1ec3fc45dccdbdc0933a235668dae2c4
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue May 31 20:23:56 2022 -0700

    IMPALA-11385: Upgrade JAVA thrift components to thrift-0.16.0
    
    This patch upgrade the JAVA thrift components to thrift-0.16.0. Most of
    refactoring revolves around moving TSerializer and TDeserializer
    intantiation within try block to after the following Apache Thrift
    commit from thrift-0.14.0:
    
    63213c1 Implements TConfiguration for java
    
    Testing:
    - Pass core tests.
    
    Change-Id: I0a146d3a36938d43225da7c12bfe676c445d3944
    Reviewed-on: http://gerrit.cloudera.org:8080/18804
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/impala-config.sh                               |  4 +--
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |  8 +++--
 .../impala/catalog/CatalogServiceCatalog.java      |  3 +-
 .../catalog/metastore/CatalogHmsClientUtils.java   |  7 +++-
 .../java/org/apache/impala/common/JniUtil.java     |  6 ++--
 .../java/org/apache/impala/service/FeSupport.java  | 14 ++++----
 .../java/org/apache/impala/service/JniCatalog.java |  2 +-
 .../org/apache/impala/service/JniFrontend.java     | 42 +++++++++++-----------
 8 files changed, 48 insertions(+), 38 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index d8bf48e65..b5a201136 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -180,8 +180,8 @@ unset IMPALA_CALLONCEHACK_URL
 # with Impyla and for the thrift compiler.
 export IMPALA_THRIFT_CPP_VERSION=0.16.0-p3
 unset IMPALA_THRIFT_CPP_URL
-export IMPALA_THRIFT_POM_VERSION=0.11.0
-export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p5
+export IMPALA_THRIFT_POM_VERSION=0.16.0
+export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p3
 unset IMPALA_THRIFT_JAVA_URL
 export IMPALA_THRIFT_PY_VERSION=0.11.0-p5
 unset IMPALA_THRIFT_PY_URL
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
index 0121cfd82..9dd7fdf6d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -466,8 +466,12 @@ public class CatalogHmsAPIHelper {
     ObjectDictionary result = new ObjectDictionary(Maps.newHashMap());
     if (networkAddresses.isEmpty()) return result;
     List<ByteBuffer> serializedAddresses = new ArrayList<>();
-    TSerializer serializer =
-        new TSerializer(new TCompactProtocol.Factory());
+    TSerializer serializer = null;
+    try {
+      serializer = new TSerializer(new TCompactProtocol.Factory());
+    } catch (TException e) {
+      throw new CatalogException("Could not create serializer. " + e.getMessage());
+    }
     for (TNetworkAddress networkAddress : networkAddresses) {
       byte[] serializedNetAddress;
       try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b3a23f51f..4515784f8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -126,6 +126,7 @@ import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -749,7 +750,7 @@ public class CatalogServiceCatalog extends Catalog {
     TSerializer serializer;
 
     GetCatalogDeltaContext(long nativeCatalogServerPtr, long fromVersion, long toVersion,
-        long lastResetStartVersion)
+        long lastResetStartVersion) throws TTransportException
     {
       this.nativeCatalogServerPtr = nativeCatalogServerPtr;
       this.fromVersion = fromVersion;
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
index 08e30cef4..6d05747c9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
@@ -142,7 +142,12 @@ public class CatalogHmsClientUtils {
     List<TNetworkAddress> networkAddresses = Lists
         .newArrayListWithCapacity(serializedNetAddresses.size());
     int index = 0;
-    TDeserializer deserializer = new TDeserializer(new Factory());
+    TDeserializer deserializer = null;
+    try {
+      deserializer = new TDeserializer(new Factory());
+    } catch (TException e) {
+      throw new CatalogException("Could not create deserializer. " + e.getMessage());
+    }
     for (ByteBuffer serializedData : serializedNetAddresses) {
       TNetworkAddress networkAddress = new TNetworkAddress();
       try {
diff --git a/fe/src/main/java/org/apache/impala/common/JniUtil.java b/fe/src/main/java/org/apache/impala/common/JniUtil.java
index 2c718c3ba..c87604ecc 100644
--- a/fe/src/main/java/org/apache/impala/common/JniUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/JniUtil.java
@@ -102,8 +102,8 @@ public class JniUtil {
    */
   public static <T extends TBase<?, ?>>
   byte[] serializeToThrift(T input) throws ImpalaException {
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(input);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -115,8 +115,8 @@ public class JniUtil {
    */
   public static <T extends TBase<?, ?>, F extends TProtocolFactory>
   byte[] serializeToThrift(T input, F protocolFactory) throws ImpalaException {
-    TSerializer serializer = new TSerializer(protocolFactory);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory);
       return serializer.serialize(input);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -135,8 +135,8 @@ public class JniUtil {
   void deserializeThrift(F protocolFactory, T result, byte[] thriftData)
       throws ImpalaException {
     // TODO: avoid creating deserializer for each query?
-    TDeserializer deserializer = new TDeserializer(protocolFactory);
     try {
+      TDeserializer deserializer = new TDeserializer(protocolFactory);
       deserializer.deserialize(result, thriftData);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 0fabc122b..d2e593b97 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -145,9 +145,9 @@ public class FeSupport {
   public static TCacheJarResult CacheJar(String hdfsLocation) throws InternalException {
     Preconditions.checkNotNull(hdfsLocation);
     TCacheJarParams params = new TCacheJarParams(hdfsLocation);
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     byte[] result;
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       result = CacheJar(serializer.serialize(params));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -186,9 +186,9 @@ public class FeSupport {
     Preconditions.checkState(!expr.contains(SlotRef.class));
     TExprBatch exprBatch = new TExprBatch();
     exprBatch.addToExprs(expr.treeToThrift());
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     byte[] result;
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       result = EvalExprsWithoutRowBounded(
           serializer.serialize(exprBatch), serializer.serialize(queryCtx), maxResultSize);
       Preconditions.checkNotNull(result);
@@ -217,8 +217,8 @@ public class FeSupport {
 
   public static TSymbolLookupResult LookupSymbol(TSymbolLookupParams params)
       throws InternalException {
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] resultBytes = LookupSymbol(serializer.serialize(params));
       Preconditions.checkNotNull(resultBytes);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -276,7 +276,6 @@ public class FeSupport {
    */
   public static TResultRow EvalPredicateBatch(List<Expr> exprs,
       TQueryCtx queryCtx) throws InternalException {
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     TExprBatch exprBatch = new TExprBatch();
     for (Expr expr: exprs) {
       // Make sure we only process boolean exprs.
@@ -286,6 +285,7 @@ public class FeSupport {
     }
     byte[] result;
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       result = EvalExprsWithoutRow(
           serializer.serialize(exprBatch), serializer.serialize(queryCtx));
       Preconditions.checkNotNull(result);
@@ -327,8 +327,8 @@ public class FeSupport {
     request.setHeader(new TCatalogServiceRequestHeader());
     request.setObject_descs(objectDescs);
 
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] result = PrioritizeLoad(serializer.serialize(request));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -362,8 +362,8 @@ public class FeSupport {
     TGetPartitionStatsRequest request = new TGetPartitionStatsRequest();
     request.setTable_name(table.toThrift());
     TGetPartitionStatsResponse response = new TGetPartitionStatsResponse();
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] result = GetPartitionStats(serializer.serialize(request));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -404,8 +404,8 @@ public class FeSupport {
     Preconditions.checkNotNull(csvQueryOptions);
     Preconditions.checkNotNull(queryOptions);
 
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] result = ParseQueryOptions(csvQueryOptions,
           serializer.serialize(queryOptions));
       Preconditions.checkNotNull(result);
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 126d08f63..6ff256e20 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -253,10 +253,10 @@ public class JniCatalog {
     long start = System.currentTimeMillis();
     TDdlExecRequest params = new TDdlExecRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDdlExecReq);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     String shortDesc = CatalogOpUtil.getShortDescForExecDdl(params);
     LOG.info("execDdl request: " + shortDesc);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       byte[] res = serializer.serialize(catalogOpExecutor_.execDdlRequest(params));
       JniUtil.logResponse(res.length, start, params, "execDdl");
       long duration = System.currentTimeMillis() - start;
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index b805c20d7..8d7860849 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -168,8 +168,8 @@ public class JniFrontend {
     }
 
     // TODO: avoid creating serializer for each query?
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -208,8 +208,8 @@ public class JniFrontend {
     TLoadDataReq request = new TLoadDataReq();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadTableDataParams);
     TLoadDataResp response = frontend_.loadTableData(request);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(response);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -232,8 +232,8 @@ public class JniFrontend {
   public byte[] getCatalogMetrics() throws ImpalaException {
     Preconditions.checkNotNull(frontend_);
     TGetCatalogMetricsResult metrics = frontend_.getCatalogMetrics();
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(metrics);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -269,8 +269,8 @@ public class JniFrontend {
     TGetTablesResult result = new TGetTablesResult();
     result.setTables(tables);
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -289,8 +289,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowFilesParams);
     TResultSet result = frontend_.getTableFiles(params);
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -320,8 +320,8 @@ public class JniFrontend {
     List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
     for (FeDb db: dbs) tDbs.add(db.toThrift());
     result.setDbs(tDbs);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -337,8 +337,8 @@ public class JniFrontend {
     TDescribeHistoryParams params = new TDescribeHistoryParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
     TGetTableHistoryResult result = frontend_.getTableHistory(params);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -368,8 +368,8 @@ public class JniFrontend {
       result.addToClass_names(dataSource.getClassName());
       result.addToApi_versions(dataSource.getApiVersion());
     }
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -391,8 +391,8 @@ public class JniFrontend {
       result = frontend_.getTableStats(params.getTable_name().getDb_name(),
           params.getTable_name().getTable_name(), params.op);
     }
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -427,8 +427,8 @@ public class JniFrontend {
     result.setFn_ret_types(retTypes);
     result.setFn_binary_types(fnBinaryTypes);
     result.setFn_persistence(fnIsPersistent);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -462,8 +462,8 @@ public class JniFrontend {
     TDescribeResult result = frontend_.describeDb(
         params.getDb(), params.getOutput_style());
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -492,8 +492,8 @@ public class JniFrontend {
       result = DescribeResultFactory.buildDescribeMinimalResult(structType);
     }
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -536,8 +536,8 @@ public class JniFrontend {
     Preconditions.checkNotNull(params.slot_types);
     TDescriptorTable result =
         DescriptorTable.buildTestDescriptorTable(params.slot_types);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       byte[] ret = serializer.serialize(result);
       return ret;
     } catch (TException e) {
@@ -552,8 +552,8 @@ public class JniFrontend {
     Preconditions.checkNotNull(frontend_);
     TShowRolesParams params = new TShowRolesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showRolesParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(frontend_.getAuthzManager().getRoles(params));
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -568,8 +568,8 @@ public class JniFrontend {
     Preconditions.checkNotNull(frontend_);
     TShowGrantPrincipalParams params = new TShowGrantPrincipalParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showGrantPrincipalParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(frontend_.getAuthzManager().getPrivileges(params));
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -586,8 +586,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, params, metadataOpsParams);
     TResultSet result = frontend_.execHiveServer2MetadataOp(params);
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -619,8 +619,8 @@ public class JniFrontend {
     }
     TGetAllHadoopConfigsResponse result = new TGetAllHadoopConfigsResponse();
     result.setConfigs(configs);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -637,8 +637,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
     TGetHadoopConfigResponse result = new TGetHadoopConfigResponse();
     result.setValue(CONF.get(request.getName()));
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -665,8 +665,8 @@ public class JniFrontend {
         throw new InternalException(e.getMessage());
       }
     }
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -728,8 +728,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
     WrappedWebContext webContext = new WrappedWebContext(request, response);
     frontend_.getSaml2Client().setRedirect(webContext);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(response);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -744,8 +744,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
     WrappedWebContext webContext = new WrappedWebContext(request, response);
     frontend_.getSaml2Client().validateAuthnResponse(webContext);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(response);
     } catch (TException e) {
       throw new InternalException(e.getMessage());


[impala] 04/04: IMPALA-11458: Update zlib and zstd

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmater pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3ed71756c4e6f575d73133ff8ce891bfebaff4e4
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Thu Aug 4 12:11:14 2022 -0700

    IMPALA-11458: Update zlib and zstd
    
    Updates zlib to 1.2.12 and zstd to 1.5.2. Requires a new toolchain
    build.
    
    Ran perf-AB-test to check for performance changes with parquet using
    zstd and gzip. Only significant changes were slight improvement with
    gzip.
    
    Change-Id: If96fd5e8c49e581478b249bb8894001457742023
    Reviewed-on: http://gerrit.cloudera.org:8080/18816
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Joe McDonnell <jo...@cloudera.com>
---
 bin/impala-config.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b5a201136..c3f36dfda 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -77,7 +77,7 @@ export IMPALA_VERSION=4.2.0-SNAPSHOT
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=189-8936b89482
+export IMPALA_TOOLCHAIN_BUILD_ID=198-9ebe744de2
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -139,7 +139,7 @@ export IMPALA_LLVM_DEBUG_VERSION=5.0.1-asserts-p4
 unset IMPALA_LLVM_DEBUG_URL
 export IMPALA_LZ4_VERSION=1.9.3
 unset IMPALA_LZ4_URL
-export IMPALA_ZSTD_VERSION=1.4.9
+export IMPALA_ZSTD_VERSION=1.5.2
 unset IMPALA_ZSTD_URL
 export IMPALA_OPENLDAP_VERSION=2.4.47
 unset IMPALA_OPENLDAP_URL
@@ -168,7 +168,7 @@ export IMPALA_TPC_DS_VERSION=2.1.0
 unset IMPALA_TPC_DS_URL
 export IMPALA_TPC_H_VERSION=2.17.0
 unset IMPALA_TPC_H_URL
-export IMPALA_ZLIB_VERSION=1.2.11
+export IMPALA_ZLIB_VERSION=1.2.12
 unset IMPALA_ZLIB_URL
 export IMPALA_CALLONCEHACK_VERSION=1.0.0
 unset IMPALA_CALLONCEHACK_URL


[impala] 03/04: IMPALA-11362: Add expire snapshots functionality for Iceberg tables

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmater pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 896ce102522f28236beb0640fbf95ad95040899a
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Mon Jun 20 13:43:17 2022 +0200

    IMPALA-11362: Add expire snapshots functionality for Iceberg tables
    
    Iceberg table modifications can create new table snapshots, these
    snapshots can be used to access an earlier version of the table. During
    the lifetime of a table the number of snapshots can accumulate and older
    versions can become obsolete as well. Iceberg API provides a safe
    solution to remove the snapshots that are not needed anymore, the
    operation is called ExpireSnapshots.
    
    This commit adds framework to execute Iceberg maintenance operation on
    tables and implements the call of an expire snapshots operation. The
    following statement becomes available for Iceberg tables:
     - ALTER TABLE <tbl> EXECUTE expire_snapshots(<timestamp>)
    
    ExpireSnapshots Iceberg API calls were meant to be chained, the calls
    are expireSnapshotId, expireOlderThan and retainLast. SQL is less
    suitable for chained calls, therefore this commit implements only the
    expireOlderThan functionality. However, in this case the retainLast call
    will fall back to its default value (1), this default can be configured
    with TableProperties.MIN_SNAPSHOTS_TO_KEEP.
    
    This commit also refactors the Iceberg e2e tests and introduces
    ImpalaTestSuite for the common methods that are used in test which
    compare snapshot versions in some way.
    
    Testing:
     - Added analysis unit tests.
     - Added e2e tests.
    
    Change-Id: Ideffee4964c18c85ca745bfb4eca08ec362416f3
    Reviewed-on: http://gerrit.cloudera.org:8080/18688
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/JniCatalog.thrift                    |  10 ++
 fe/src/main/cup/sql-parser.cup                     |  26 ++--
 .../impala/analysis/AlterTableExecuteStmt.java     | 120 +++++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   |   7 +
 .../impala/service/IcebergCatalogOpExecutor.java   |  10 ++
 fe/src/main/jflex/sql-scanner.flex                 |   1 +
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  26 ++++
 tests/common/iceberg_test_suite.py                 |  66 +++++++++
 tests/query_test/test_iceberg.py                   | 165 ++++++++++++---------
 9 files changed, 353 insertions(+), 78 deletions(-)

diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 152e7e5a5..24cd7856d 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -114,6 +114,7 @@ enum TAlterTableType {
   SET_OWNER = 16
   UNSET_TBL_PROPERTIES = 17
   SET_PARTITION_SPEC = 18
+  EXECUTE = 19
 }
 
 // Parameters of CREATE DATABASE commands
@@ -411,6 +412,12 @@ struct TAlterTableSetPartitionSpecParams {
   1: required CatalogObjects.TIcebergPartitionSpec partition_spec
 }
 
+// Parameters for ALTER TABLE EXECUTE operations.
+struct TAlterTableExecuteParams {
+  // The parameter of the ExpireSnapshot.expireOlderThan(timestampMillis) Iceberg call.
+  1: required i64 older_than_millis
+}
+
 // Parameters for all ALTER TABLE commands.
 struct TAlterTableParams {
   1: required TAlterTableType alter_type
@@ -468,6 +475,9 @@ struct TAlterTableParams {
 
   // Parameters for ALTER TABLE SET PARTITION SPEC
   19: optional TAlterTableSetPartitionSpecParams set_partition_spec_params
+
+  // Parameters for ALTER TABLE EXECUTE
+  20: optional TAlterTableExecuteParams set_execute_params
 }
 
 // Parameters of CREATE TABLE LIKE commands
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index a2afeced6..b7f85f356 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -291,16 +291,16 @@ terminal
   KW_COMPUTE, KW_CONSTRAINT, KW_COPY, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_DATA,
   KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE,
   KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISABLE, KW_DISTINCT, KW_DIV, KW_DOUBLE,
-  KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ESCAPED, KW_EXCEPT, KW_EXISTS, KW_EXPLAIN,
-  KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN,
-  KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FOREIGN, KW_FORMAT, KW_FORMATTED,
-  KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_GROUPING, KW_HASH,
-  KW_HUDIPARQUET,
-  KW_IGNORE, KW_HAVING, KW_ICEBERG, KW_IF, KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER,
-  KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP,
-  KW_IS, KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
-  KW_LOAD, KW_LOCATION, KW_LOGICAL_OR,
-  KW_MANAGED_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_MINUS, KW_NORELY, KW_NOT,
+  KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ESCAPED, KW_EXCEPT, KW_EXECUTE,
+  KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT,
+  KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FOREIGN,
+  KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT,
+  KW_GROUP, KW_GROUPING, KW_HASH, KW_HUDIPARQUET, KW_IGNORE, KW_HAVING, KW_ICEBERG, KW_IF,
+  KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
+  KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS,
+  KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
+  KW_LOAD, KW_LOCATION, KW_LOGICAL_OR, KW_MANAGED_LOCATION, KW_MAP, KW_MERGE_FN,
+  KW_METADATA, KW_MINUS, KW_NORELY, KW_NOT,
   KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OR,
   KW_ORC, KW_ORDER, KW_OUTER,
   KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED,
@@ -1315,6 +1315,10 @@ alter_tbl_stmt ::=
     IcebergPartitionSpec ips = new IcebergPartitionSpec(cols);
     RESULT = new AlterTableSetPartitionSpecStmt(table, ips);
   :}
+  | KW_ALTER KW_TABLE table_name:table KW_EXECUTE function_call_expr:expr
+  {:
+    RESULT = new AlterTableExecuteStmt(table, expr);
+  :}
   ;
 
 table_property_type ::=
@@ -4139,6 +4143,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_EXISTS:r
   {: RESULT = r.toString(); :}
+  | KW_EXECUTE:r
+  {: RESULT = r.toString(); :}
   | KW_EXPLAIN:r
   {: RESULT = r.toString(); :}
   | KW_EXTENDED:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
new file mode 100644
index 000000000..141aa386d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
+import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.util.ExprUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
+ * tables, supported operations:
+ *  - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
+ *    calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
+ *    TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
+ *    should be retained even when all snapshots are selected by expireOlderThan().
+ */
+public class AlterTableExecuteStmt extends AlterTableStmt {
+  private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
+
+  private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
+
+  // Expression of the function call after EXECUTE keyword. Parsed into an operation and
+  // a value of that operation.
+  private FunctionCallExpr fnCallExpr_;
+
+  // Value expression from fnCallExpr_.
+  private Expr fnParamValue_;
+
+  // The value after extracted from fnParamValue_ expression.
+  private long olderThanMillis_ = -1;
+
+  protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
+    super(tableName);
+    fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
+    analyzeFunctionCallExpr(analyzer);
+    analyzeOlderThan(analyzer);
+  }
+
+  private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
+    // fnCallExpr_ analyzed here manually, because it is not an actual function but a
+    // catalog operation.
+    String fnName = fnCallExpr_.getFnName().toString();
+    if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
+      throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
+          "TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
+    }
+    if (fnCallExpr_.getParams().size() != 1) {
+      throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
+    }
+    fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
+  }
+
+  private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkNotNull(fnParamValue_);
+    fnParamValue_.analyze(analyzer);
+    if (!fnParamValue_.isConstant()) {
+      throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
+    }
+    if (fnParamValue_.getType().isStringType()) {
+      fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
+    }
+    if (!fnParamValue_.getType().isTimestamp()) {
+      throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
+          fnParamValue_.getType() + "': " + fnParamValue_.toSql());
+    }
+    try {
+      olderThanMillis_ =
+          ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
+      LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
+    } catch (InternalException ie) {
+      throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
+          USAGE + ": " + ie.getMessage(), ie);
+    }
+  }
+
+  @Override
+  public String toSql(ToSqlOptions options) {
+    return fnCallExpr_.toSql();
+  }
+
+  @Override
+  public TAlterTableParams toThrift() {
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.EXECUTE);
+    TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
+    executeParams.setOlder_than_millis(olderThanMillis_);
+    params.setSet_execute_params(executeParams);
+    return params;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index d641b90ca..1181f2b2e 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1287,6 +1287,7 @@ public class CatalogOpExecutor {
   private boolean altersIcebergTable(TAlterTableType type) {
     return type == TAlterTableType.ADD_COLUMNS
         || type == TAlterTableType.REPLACE_COLUMNS
+        || type == TAlterTableType.EXECUTE
         || type == TAlterTableType.DROP_COLUMN
         || type == TAlterTableType.ALTER_COLUMN
         || type == TAlterTableType.SET_PARTITION_SPEC
@@ -1322,6 +1323,12 @@ public class CatalogOpExecutor {
                alterColParams.getNew_col_def());
           addSummary(response, "Column has been altered.");
           break;
+        case EXECUTE:
+          Preconditions.checkState(params.isSetSet_execute_params());
+          String summary = IcebergCatalogOpExecutor.alterTableExecute(tbl,
+              params.getSet_execute_params());
+          addSummary(response, summary);
+          break;
         case SET_PARTITION_SPEC:
           // Set partition spec uses 'TableOperations', not transactions.
           needsTxn = false;
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 0745a04d2..2a482e2bb 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
@@ -50,6 +51,7 @@ import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergColumnStats;
 import org.apache.impala.fb.FbIcebergDataFile;
+import org.apache.impala.thrift.TAlterTableExecuteParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergOperationParam;
@@ -175,6 +177,14 @@ public class IcebergCatalogOpExecutor {
     tableOp.commit(metadata, newMetadata);
   }
 
+  public static String alterTableExecute(FeIcebergTable tbl,
+      TAlterTableExecuteParams params) {
+    ExpireSnapshots expireApi = tbl.getIcebergApiTable().expireSnapshots();
+    expireApi.expireOlderThan(params.older_than_millis);
+    expireApi.commit();
+    return "Snapshots have been expired.";
+  }
+
   /**
    * Drops a column from a Iceberg table.
    */
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index e60676a9c..79786ed41 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -125,6 +125,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("escaped", SqlParserSymbols.KW_ESCAPED);
     keywordMap.put("except", SqlParserSymbols.KW_EXCEPT);
     keywordMap.put("exists", SqlParserSymbols.KW_EXISTS);
+    keywordMap.put("execute", SqlParserSymbols.KW_EXECUTE);
     keywordMap.put("explain", SqlParserSymbols.KW_EXPLAIN);
     keywordMap.put("extended", SqlParserSymbols.KW_EXTENDED);
     keywordMap.put("external", SqlParserSymbols.KW_EXTERNAL);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index e8c8e4778..292443425 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -4138,6 +4138,32 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     }
   }
 
+  @Test
+  public void TestAlterExecuteExpireSnapshots() {
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "expire_snapshots(now() - interval 20 years);");
+    AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
+        "expire_snapshots('2022-01-04 10:00:00');");
+
+    // Negative tests
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "unsupported_operation(123456789);", "'unsupported_operation' is not supported " +
+        "by ALTER TABLE <table> EXECUTE. Supported operation is " +
+        "EXPIRE_SNAPSHOTS(<expression>)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "expire_snapshots(now(), 3);", "EXPIRE_SNAPSHOTS(<expression>) must have one " +
+        "parameter: expire_snapshots(now(), 3)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "expire_snapshots(id);", "EXPIRE_SNAPSHOTS(<expression>) must be a constant " +
+        "expression: expire_snapshots(id)");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "expire_snapshots(42);", "EXPIRE_SNAPSHOTS(<expression>) must be a timestamp " +
+        "type but is 'TINYINT': 42");
+    AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
+        "expire_snapshots('2021-02-32 15:52:45');", "Invalid TIMESTAMP expression has" +
+        " been given to EXPIRE_SNAPSHOTS(<expression>)");
+  }
+
   private static String buildLongOwnerName() {
     StringBuilder comment = new StringBuilder();
     for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
diff --git a/tests/common/iceberg_test_suite.py b/tests/common/iceberg_test_suite.py
new file mode 100644
index 000000000..b9a8f76ac
--- /dev/null
+++ b/tests/common/iceberg_test_suite.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class IcebergTestSuite(ImpalaTestSuite):
+
+  @classmethod
+  def quote(cls, s):
+    return "'{0}'".format(s)
+
+  @classmethod
+  def cast_ts(cls, ts):
+    return "CAST({0} as timestamp)".format(cls.quote(ts))
+
+  @classmethod
+  def execute_query_ts(cls, impalad_client, query):
+    """Executes the given query then returns the time when it finished."""
+    impalad_client.execute(query)
+    return datetime.datetime.now()
+
+  @classmethod
+  def expect_num_snapshots_from(cls, impalad_client, tbl_name, ts, expected_result_size):
+    """Executes DESCRIBE HISTORY <tbl> FROM through the given client. Verifies if the
+       result snapshots are newer than the provided timestamp and checks the expected
+       number of results."""
+    query = "DESCRIBE HISTORY {0} FROM {1};".format(tbl_name, cls.cast_ts(ts))
+    data = impalad_client.execute(query)
+    assert len(data.data) == expected_result_size
+    for i in range(len(data.data)):
+      result_ts_dt = cls.parse_timestamp(data.data[i].split('\t')[0])
+      assert result_ts_dt >= ts
+
+  @classmethod
+  def parse_timestamp(cls, ts_string):
+    """The client can receive the timestamp in two formats, if the timestamp has
+    fractional seconds "yyyy-MM-dd HH:mm:ss.SSSSSSSSS" pattern is used, otherwise
+    "yyyy-MM-dd HH:mm:ss". Additionally, Python's datetime library cannot handle
+    nanoseconds, therefore in that case the timestamp has to be trimmed."""
+    if len(ts_string.split('.')) > 1:
+      return datetime.datetime.strptime(ts_string[:-3], '%Y-%m-%d %H:%M:%S.%f')
+    else:
+      return datetime.datetime.strptime(ts_string, '%Y-%m-%d %H:%M:%S')
+
+  @classmethod
+  def impala_now(cls, impalad_client):
+    now_data = impalad_client.execute("select now()")
+    now_data_ts_dt = cls.parse_timestamp(now_data.data[0])
+    return now_data_ts_dt
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 766cd573b..0fe7c8358 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import datetime
+import logging
 import os
 import pytest
 import random
@@ -29,14 +30,18 @@ from avro.io import DatumReader
 import json
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
-from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+from tests.common.iceberg_test_suite import IcebergTestSuite
+
 from tests.common.skip import SkipIf
 
 from tests.util.filesystem_utils import get_fs_path, IS_HDFS
 from tests.util.get_parquet_metadata import get_parquet_metadata
 from tests.common.file_utils import create_iceberg_table_from_directory
 
-class TestIcebergTable(ImpalaTestSuite):
+LOG = logging.getLogger(__name__)
+
+
+class TestIcebergTable(IcebergTestSuite):
   """Tests related to Iceberg tables."""
 
   @classmethod
@@ -58,6 +63,61 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_alter_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database)
 
+  def test_expire_snapshots(self, vector, unique_database):
+    tbl_name = unique_database + ".expire_snapshots"
+
+    # We are setting the TIMEZONE query option in this test, so let's create a local
+    # impala client.
+    with self.create_impala_client() as impalad_client:
+      # Iceberg doesn't create a snapshot entry for the initial empty table
+      impalad_client.execute("create table {0} (i int) stored as iceberg"
+          .format(tbl_name))
+      ts_0 = datetime.datetime.now()
+      insert_q = "insert into {0} values (1)".format(tbl_name)
+      ts_1 = self.execute_query_ts(impalad_client, insert_q)
+      time.sleep(5)
+      impalad_client.execute(insert_q)
+      time.sleep(5)
+      ts_2 = self.execute_query_ts(impalad_client, insert_q)
+      impalad_client.execute(insert_q)
+
+      # There should be 4 snapshots initially
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
+      # Expire the oldest snapshot and test that the oldest one was expired
+      expire_q = "alter table {0} execute expire_snapshots({1})"
+      impalad_client.execute(expire_q.format(tbl_name, self.cast_ts(ts_1)))
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
+
+      # Expire with a timestamp in which the interval does not touch existing snapshot
+      impalad_client.execute(expire_q.format(tbl_name, self.cast_ts(ts_1)))
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
+
+      # Expire all, but retain 1
+      impalad_client.execute(expire_q.format(tbl_name,
+          self.cast_ts(datetime.datetime.now())))
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
+
+      # Change number of retained snapshots, then expire all
+      impalad_client.execute("""alter table {0} set tblproperties
+          ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
+      impalad_client.execute(insert_q)
+      impalad_client.execute(insert_q)
+      impalad_client.execute(expire_q.format(tbl_name,
+          self.cast_ts(datetime.datetime.now())))
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
+
+      # Check that timezone is interpreted in local timezone controlled by query option
+      # TIMEZONE.
+      impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+      impalad_client.execute(insert_q)
+      ts_tokyo = self.impala_now(impalad_client)
+      impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
+      impalad_client.execute(insert_q)
+      impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
+      impalad_client.execute(expire_q.format(tbl_name, self.cast_ts(ts_tokyo)))
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
+
   def test_truncate_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
 
@@ -177,67 +237,36 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_describe_history_params(self, vector, unique_database):
     tbl_name = unique_database + ".describe_history"
 
-    def execute_query_ts(query):
-      impalad_client.execute(query)
-      return datetime.datetime.now()
-
-    def expect_results_from(ts, expected_result_size):
-      query = "DESCRIBE HISTORY {0} FROM {1};".format(tbl_name, cast_ts(ts))
-      data = impalad_client.execute(query)
-      assert len(data.data) == expected_result_size
-      for i in range(len(data.data)):
-        result_ts_dt = parse_timestamp(data.data[i].split('\t')[0])
-        assert result_ts_dt >= ts
-
     def expect_results_between(ts_start, ts_end, expected_result_size):
       query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format(
-        tbl_name, cast_ts(ts_start), cast_ts(ts_end))
+        tbl_name, self.cast_ts(ts_start), self.cast_ts(ts_end))
       data = impalad_client.execute(query)
       assert len(data.data) == expected_result_size
       for i in range(len(data.data)):
-        result_ts_dt = parse_timestamp(data.data[i].split('\t')[0])
+        result_ts_dt = self.parse_timestamp(data.data[i].split('\t')[0])
         assert result_ts_dt >= ts_start and result_ts_dt <= ts_end
 
-    def parse_timestamp(ts_string):
-      """The client can receive the timestamp in two formats, if the timestamp has
-      fractional seconds "yyyy-MM-dd HH:mm:ss.SSSSSSSSS" pattern is used, otherwise
-      "yyyy-MM-dd HH:mm:ss". Additionally, Python's datetime library cannot handle
-      nanoseconds, therefore in that case the timestamp has to be trimmed."""
-      if len(ts_string.split('.')) > 1:
-        return datetime.datetime.strptime(ts_string[:-3], '%Y-%m-%d %H:%M:%S.%f')
-      else:
-        return datetime.datetime.strptime(ts_string, '%Y-%m-%d %H:%M:%S')
-
-    def quote(s):
-      return "'{0}'".format(s)
-
-    def cast_ts(ts):
-      return "CAST({0} as timestamp)".format(quote(ts))
-
-    def impala_now():
-      now_data = impalad_client.execute("select now()")
-      now_data_ts_dt = parse_timestamp(now_data.data[0])
-      return now_data_ts_dt
-
     # We are setting the TIMEZONE query option in this test, so let's create a local
     # impala client.
     with self.create_impala_client() as impalad_client:
       # Iceberg doesn't create a snapshot entry for the initial empty table
       impalad_client.execute("create table {0} (i int) stored as iceberg"
           .format(tbl_name))
-      ts_1 = execute_query_ts("insert into {0} values (1)".format(tbl_name))
+      insert_q = "insert into {0} values (1)".format(tbl_name)
+      ts_1 = self.execute_query_ts(impalad_client, insert_q)
       time.sleep(5)
-      ts_2 = execute_query_ts("insert into {0} values (2)".format(tbl_name))
+      ts_2 = self.execute_query_ts(impalad_client, insert_q)
       time.sleep(5)
-      ts_3 = execute_query_ts("insert into {0} values (3)".format(tbl_name))
+      ts_3 = self.execute_query_ts(impalad_client, insert_q)
       # Describe history without predicate
       data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
       assert len(data.data) == 3
 
       # Describe history with FROM predicate
-      expect_results_from(ts_1 - datetime.timedelta(hours=1), 3)
-      expect_results_from(ts_1, 2)
-      expect_results_from(ts_3, 0)
+      self.expect_num_snapshots_from(impalad_client, tbl_name,
+          ts_1 - datetime.timedelta(hours=1), 3)
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 2)
+      self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
 
       # Describe history with BETWEEN <ts> AND <ts> predicate
       expect_results_between(ts_1, ts_2, 1)
@@ -248,26 +277,22 @@ class TestIcebergTable(ImpalaTestSuite):
       # Check that timezone is interpreted in local timezone controlled by query option
       # TIMEZONE. Persist the local times first and create a new snapshot.
       impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
-      now_tokyo = impala_now()
+      now_tokyo = self.impala_now(impalad_client)
       impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
-      now_budapest = impala_now()
-      execute_query_ts("insert into {0} values (4)".format(tbl_name))
-      expect_results_from(now_budapest, 1)
+      now_budapest = self.impala_now(impalad_client)
+      self.execute_query_ts(impalad_client, "insert into {0} values (4)".format(tbl_name))
+      self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 1)
 
       # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time.
       impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
-      expect_results_from(now_tokyo, 1)
+      self.expect_num_snapshots_from(impalad_client, tbl_name, now_tokyo, 1)
 
       # Interpreting Budapest time in Tokyo time points to the past.
-      expect_results_from(now_budapest, 4)
+      self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 4)
 
   def test_time_travel(self, vector, unique_database):
     tbl_name = unique_database + ".time_travel"
 
-    def execute_query_ts(query):
-      impalad_client.execute(query)
-      return str(datetime.datetime.now())
-
     def expect_results(query, expected_results):
       data = impalad_client.execute(query)
       assert len(data.data) == len(expected_results)
@@ -324,22 +349,26 @@ class TestIcebergTable(ImpalaTestSuite):
     # impala client.
     with self.create_impala_client() as impalad_client:
       # Iceberg doesn't create a snapshot entry for the initial empty table
-      impalad_client.execute("create table {0} (i int) stored as iceberg".format(tbl_name))
-      ts_1 = execute_query_ts("insert into {0} values (1)".format(tbl_name))
-      ts_2 = execute_query_ts("insert into {0} values (2)".format(tbl_name))
-      ts_3 = execute_query_ts("truncate table {0}".format(tbl_name))
+      impalad_client.execute("create table {0} (i int) stored as iceberg"
+          .format(tbl_name))
+      ts_1 = self.execute_query_ts(impalad_client, "insert into {0} values (1)"
+          .format(tbl_name))
+      ts_2 = self.execute_query_ts(impalad_client, "insert into {0} values (2)"
+          .format(tbl_name))
+      ts_3 = self.execute_query_ts(impalad_client, "truncate table {0}".format(tbl_name))
       time.sleep(5)
-      ts_4 = execute_query_ts("insert into {0} values (100)".format(tbl_name))
+      ts_4 = self.execute_query_ts(impalad_client, "insert into {0} values (100)"
+          .format(tbl_name))
       # Query table as of timestamps.
       expect_results_t("now()", ['100'])
-      expect_results_t(quote(ts_1), ['1'])
-      expect_results_t(quote(ts_2), ['1', '2'])
-      expect_results_t(quote(ts_3), [])
-      expect_results_t(cast_ts(ts_3) + " + interval 1 seconds", [])
-      expect_results_t(quote(ts_4), ['100'])
-      expect_results_t(cast_ts(ts_4) + " - interval 5 seconds", [])
+      expect_results_t(self.quote(ts_1), ['1'])
+      expect_results_t(self.quote(ts_2), ['1', '2'])
+      expect_results_t(self.quote(ts_3), [])
+      expect_results_t(self.cast_ts(ts_3) + " + interval 1 seconds", [])
+      expect_results_t(self.quote(ts_4), ['100'])
+      expect_results_t(self.cast_ts(ts_4) + " - interval 5 seconds", [])
       # Future queries return the current snapshot.
-      expect_results_t(cast_ts(ts_4) + " + interval 1 hours", ['100'])
+      expect_results_t(self.cast_ts(ts_4) + " + interval 1 hours", ['100'])
       # Query table as of snapshot IDs.
       snapshots = get_snapshots()
       expect_results_v(snapshots[0], ['1'])
@@ -406,16 +435,16 @@ class TestIcebergTable(ImpalaTestSuite):
       impalad_client.execute("insert into {0} values (1111)".format(tbl_name))
       impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
       now_budapest = impala_now()
-      expect_results_t(quote(now_budapest), ['1111'])
+      expect_results_t(self.quote(now_budapest), ['1111'])
 
       # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time.
       impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
       now_tokyo = impala_now()
-      expect_results_t(quote(now_tokyo), ['1111'])
+      expect_results_t(self.quote(now_tokyo), ['1111'])
       try:
         # Interpreting Budapest time in Tokyo time points to the past when the table
         # didn't exist.
-        expect_results_t(quote(now_budapest), [])
+        expect_results_t(self.quote(now_budapest), [])
         assert False
       except Exception as e:
         assert "Cannot find a snapshot older than" in str(e)