You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2022/08/10 07:26:41 UTC

[impala] 02/04: IMPALA-11385: Upgrade JAVA thrift components to thrift-0.16.0

This is an automated email from the ASF dual-hosted git repository.

tmater pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b6d6e86f1ec3fc45dccdbdc0933a235668dae2c4
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue May 31 20:23:56 2022 -0700

    IMPALA-11385: Upgrade JAVA thrift components to thrift-0.16.0
    
    This patch upgrade the JAVA thrift components to thrift-0.16.0. Most of
    refactoring revolves around moving TSerializer and TDeserializer
    intantiation within try block to after the following Apache Thrift
    commit from thrift-0.14.0:
    
    63213c1 Implements TConfiguration for java
    
    Testing:
    - Pass core tests.
    
    Change-Id: I0a146d3a36938d43225da7c12bfe676c445d3944
    Reviewed-on: http://gerrit.cloudera.org:8080/18804
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/impala-config.sh                               |  4 +--
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |  8 +++--
 .../impala/catalog/CatalogServiceCatalog.java      |  3 +-
 .../catalog/metastore/CatalogHmsClientUtils.java   |  7 +++-
 .../java/org/apache/impala/common/JniUtil.java     |  6 ++--
 .../java/org/apache/impala/service/FeSupport.java  | 14 ++++----
 .../java/org/apache/impala/service/JniCatalog.java |  2 +-
 .../org/apache/impala/service/JniFrontend.java     | 42 +++++++++++-----------
 8 files changed, 48 insertions(+), 38 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index d8bf48e65..b5a201136 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -180,8 +180,8 @@ unset IMPALA_CALLONCEHACK_URL
 # with Impyla and for the thrift compiler.
 export IMPALA_THRIFT_CPP_VERSION=0.16.0-p3
 unset IMPALA_THRIFT_CPP_URL
-export IMPALA_THRIFT_POM_VERSION=0.11.0
-export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p5
+export IMPALA_THRIFT_POM_VERSION=0.16.0
+export IMPALA_THRIFT_JAVA_VERSION=${IMPALA_THRIFT_POM_VERSION}-p3
 unset IMPALA_THRIFT_JAVA_URL
 export IMPALA_THRIFT_PY_VERSION=0.11.0-p5
 unset IMPALA_THRIFT_PY_URL
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
index 0121cfd82..9dd7fdf6d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -466,8 +466,12 @@ public class CatalogHmsAPIHelper {
     ObjectDictionary result = new ObjectDictionary(Maps.newHashMap());
     if (networkAddresses.isEmpty()) return result;
     List<ByteBuffer> serializedAddresses = new ArrayList<>();
-    TSerializer serializer =
-        new TSerializer(new TCompactProtocol.Factory());
+    TSerializer serializer = null;
+    try {
+      serializer = new TSerializer(new TCompactProtocol.Factory());
+    } catch (TException e) {
+      throw new CatalogException("Could not create serializer. " + e.getMessage());
+    }
     for (TNetworkAddress networkAddress : networkAddresses) {
       byte[] serializedNetAddress;
       try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b3a23f51f..4515784f8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -126,6 +126,7 @@ import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -749,7 +750,7 @@ public class CatalogServiceCatalog extends Catalog {
     TSerializer serializer;
 
     GetCatalogDeltaContext(long nativeCatalogServerPtr, long fromVersion, long toVersion,
-        long lastResetStartVersion)
+        long lastResetStartVersion) throws TTransportException
     {
       this.nativeCatalogServerPtr = nativeCatalogServerPtr;
       this.fromVersion = fromVersion;
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
index 08e30cef4..6d05747c9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
@@ -142,7 +142,12 @@ public class CatalogHmsClientUtils {
     List<TNetworkAddress> networkAddresses = Lists
         .newArrayListWithCapacity(serializedNetAddresses.size());
     int index = 0;
-    TDeserializer deserializer = new TDeserializer(new Factory());
+    TDeserializer deserializer = null;
+    try {
+      deserializer = new TDeserializer(new Factory());
+    } catch (TException e) {
+      throw new CatalogException("Could not create deserializer. " + e.getMessage());
+    }
     for (ByteBuffer serializedData : serializedNetAddresses) {
       TNetworkAddress networkAddress = new TNetworkAddress();
       try {
diff --git a/fe/src/main/java/org/apache/impala/common/JniUtil.java b/fe/src/main/java/org/apache/impala/common/JniUtil.java
index 2c718c3ba..c87604ecc 100644
--- a/fe/src/main/java/org/apache/impala/common/JniUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/JniUtil.java
@@ -102,8 +102,8 @@ public class JniUtil {
    */
   public static <T extends TBase<?, ?>>
   byte[] serializeToThrift(T input) throws ImpalaException {
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(input);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -115,8 +115,8 @@ public class JniUtil {
    */
   public static <T extends TBase<?, ?>, F extends TProtocolFactory>
   byte[] serializeToThrift(T input, F protocolFactory) throws ImpalaException {
-    TSerializer serializer = new TSerializer(protocolFactory);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory);
       return serializer.serialize(input);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -135,8 +135,8 @@ public class JniUtil {
   void deserializeThrift(F protocolFactory, T result, byte[] thriftData)
       throws ImpalaException {
     // TODO: avoid creating deserializer for each query?
-    TDeserializer deserializer = new TDeserializer(protocolFactory);
     try {
+      TDeserializer deserializer = new TDeserializer(protocolFactory);
       deserializer.deserialize(result, thriftData);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 0fabc122b..d2e593b97 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -145,9 +145,9 @@ public class FeSupport {
   public static TCacheJarResult CacheJar(String hdfsLocation) throws InternalException {
     Preconditions.checkNotNull(hdfsLocation);
     TCacheJarParams params = new TCacheJarParams(hdfsLocation);
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     byte[] result;
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       result = CacheJar(serializer.serialize(params));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -186,9 +186,9 @@ public class FeSupport {
     Preconditions.checkState(!expr.contains(SlotRef.class));
     TExprBatch exprBatch = new TExprBatch();
     exprBatch.addToExprs(expr.treeToThrift());
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     byte[] result;
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       result = EvalExprsWithoutRowBounded(
           serializer.serialize(exprBatch), serializer.serialize(queryCtx), maxResultSize);
       Preconditions.checkNotNull(result);
@@ -217,8 +217,8 @@ public class FeSupport {
 
   public static TSymbolLookupResult LookupSymbol(TSymbolLookupParams params)
       throws InternalException {
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] resultBytes = LookupSymbol(serializer.serialize(params));
       Preconditions.checkNotNull(resultBytes);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -276,7 +276,6 @@ public class FeSupport {
    */
   public static TResultRow EvalPredicateBatch(List<Expr> exprs,
       TQueryCtx queryCtx) throws InternalException {
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     TExprBatch exprBatch = new TExprBatch();
     for (Expr expr: exprs) {
       // Make sure we only process boolean exprs.
@@ -286,6 +285,7 @@ public class FeSupport {
     }
     byte[] result;
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       result = EvalExprsWithoutRow(
           serializer.serialize(exprBatch), serializer.serialize(queryCtx));
       Preconditions.checkNotNull(result);
@@ -327,8 +327,8 @@ public class FeSupport {
     request.setHeader(new TCatalogServiceRequestHeader());
     request.setObject_descs(objectDescs);
 
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] result = PrioritizeLoad(serializer.serialize(request));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -362,8 +362,8 @@ public class FeSupport {
     TGetPartitionStatsRequest request = new TGetPartitionStatsRequest();
     request.setTable_name(table.toThrift());
     TGetPartitionStatsResponse response = new TGetPartitionStatsResponse();
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] result = GetPartitionStats(serializer.serialize(request));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
@@ -404,8 +404,8 @@ public class FeSupport {
     Preconditions.checkNotNull(csvQueryOptions);
     Preconditions.checkNotNull(queryOptions);
 
-    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
       byte[] result = ParseQueryOptions(csvQueryOptions,
           serializer.serialize(queryOptions));
       Preconditions.checkNotNull(result);
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 126d08f63..6ff256e20 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -253,10 +253,10 @@ public class JniCatalog {
     long start = System.currentTimeMillis();
     TDdlExecRequest params = new TDdlExecRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDdlExecReq);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     String shortDesc = CatalogOpUtil.getShortDescForExecDdl(params);
     LOG.info("execDdl request: " + shortDesc);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(shortDesc)) {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       byte[] res = serializer.serialize(catalogOpExecutor_.execDdlRequest(params));
       JniUtil.logResponse(res.length, start, params, "execDdl");
       long duration = System.currentTimeMillis() - start;
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index b805c20d7..8d7860849 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -168,8 +168,8 @@ public class JniFrontend {
     }
 
     // TODO: avoid creating serializer for each query?
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -208,8 +208,8 @@ public class JniFrontend {
     TLoadDataReq request = new TLoadDataReq();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadTableDataParams);
     TLoadDataResp response = frontend_.loadTableData(request);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(response);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -232,8 +232,8 @@ public class JniFrontend {
   public byte[] getCatalogMetrics() throws ImpalaException {
     Preconditions.checkNotNull(frontend_);
     TGetCatalogMetricsResult metrics = frontend_.getCatalogMetrics();
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(metrics);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -269,8 +269,8 @@ public class JniFrontend {
     TGetTablesResult result = new TGetTablesResult();
     result.setTables(tables);
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -289,8 +289,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowFilesParams);
     TResultSet result = frontend_.getTableFiles(params);
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -320,8 +320,8 @@ public class JniFrontend {
     List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
     for (FeDb db: dbs) tDbs.add(db.toThrift());
     result.setDbs(tDbs);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -337,8 +337,8 @@ public class JniFrontend {
     TDescribeHistoryParams params = new TDescribeHistoryParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
     TGetTableHistoryResult result = frontend_.getTableHistory(params);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -368,8 +368,8 @@ public class JniFrontend {
       result.addToClass_names(dataSource.getClassName());
       result.addToApi_versions(dataSource.getApiVersion());
     }
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -391,8 +391,8 @@ public class JniFrontend {
       result = frontend_.getTableStats(params.getTable_name().getDb_name(),
           params.getTable_name().getTable_name(), params.op);
     }
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -427,8 +427,8 @@ public class JniFrontend {
     result.setFn_ret_types(retTypes);
     result.setFn_binary_types(fnBinaryTypes);
     result.setFn_persistence(fnIsPersistent);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -462,8 +462,8 @@ public class JniFrontend {
     TDescribeResult result = frontend_.describeDb(
         params.getDb(), params.getOutput_style());
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -492,8 +492,8 @@ public class JniFrontend {
       result = DescribeResultFactory.buildDescribeMinimalResult(structType);
     }
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -536,8 +536,8 @@ public class JniFrontend {
     Preconditions.checkNotNull(params.slot_types);
     TDescriptorTable result =
         DescriptorTable.buildTestDescriptorTable(params.slot_types);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       byte[] ret = serializer.serialize(result);
       return ret;
     } catch (TException e) {
@@ -552,8 +552,8 @@ public class JniFrontend {
     Preconditions.checkNotNull(frontend_);
     TShowRolesParams params = new TShowRolesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showRolesParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(frontend_.getAuthzManager().getRoles(params));
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -568,8 +568,8 @@ public class JniFrontend {
     Preconditions.checkNotNull(frontend_);
     TShowGrantPrincipalParams params = new TShowGrantPrincipalParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showGrantPrincipalParams);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(frontend_.getAuthzManager().getPrivileges(params));
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -586,8 +586,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, params, metadataOpsParams);
     TResultSet result = frontend_.execHiveServer2MetadataOp(params);
 
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -619,8 +619,8 @@ public class JniFrontend {
     }
     TGetAllHadoopConfigsResponse result = new TGetAllHadoopConfigsResponse();
     result.setConfigs(configs);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -637,8 +637,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
     TGetHadoopConfigResponse result = new TGetHadoopConfigResponse();
     result.setValue(CONF.get(request.getName()));
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -665,8 +665,8 @@ public class JniFrontend {
         throw new InternalException(e.getMessage());
       }
     }
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(result);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -728,8 +728,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
     WrappedWebContext webContext = new WrappedWebContext(request, response);
     frontend_.getSaml2Client().setRedirect(webContext);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(response);
     } catch (TException e) {
       throw new InternalException(e.getMessage());
@@ -744,8 +744,8 @@ public class JniFrontend {
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
     WrappedWebContext webContext = new WrappedWebContext(request, response);
     frontend_.getSaml2Client().validateAuthnResponse(webContext);
-    TSerializer serializer = new TSerializer(protocolFactory_);
     try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
       return serializer.serialize(response);
     } catch (TException e) {
       throw new InternalException(e.getMessage());