You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:31 UTC
[14/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/DescribeResultFactory.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/DescribeResultFactory.java b/fe/src/main/java/com/cloudera/impala/service/DescribeResultFactory.java
deleted file mode 100644
index c1a9557..0000000
--- a/fe/src/main/java/com/cloudera/impala/service/DescribeResultFactory.java
+++ /dev/null
@@ -1,246 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.service;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
-import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
-
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.Db;
-import com.cloudera.impala.catalog.StructField;
-import com.cloudera.impala.catalog.StructType;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TDescribeOutputStyle;
-import com.cloudera.impala.thrift.TDescribeResult;
-import com.cloudera.impala.thrift.TResultRow;
-import com.google.common.collect.Lists;
-
-/*
- * Builds results for DESCRIBE DATABASE statements by constructing and
- * populating a TDescribeResult object.
- */
-public class DescribeResultFactory {
- // Number of columns in each row of the DESCRIBE FORMATTED|EXTENDED result set.
- private final static int NUM_DESC_FORMATTED_RESULT_COLS = 3;
- // Empty column used to format description output table.
- private final static TColumnValue EMPTY = new TColumnValue().setString_val("");
-
- public static TDescribeResult buildDescribeDbResult(Db db,
- TDescribeOutputStyle outputFormat) {
- switch (outputFormat) {
- case MINIMAL: return describeDbMinimal(db);
- case FORMATTED:
- case EXTENDED:
- return describeDbExtended(db);
- default: throw new UnsupportedOperationException(
- "Unknown TDescribeOutputStyle value for describe database: " + outputFormat);
- }
- }
-
- /*
- * Builds results for a DESCRIBE DATABASE <db> command. This consists of the database
- * location and comment.
- */
- private static TDescribeResult describeDbMinimal(Db db) {
- TDescribeResult descResult = new TDescribeResult();
-
- org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
- descResult.results = Lists.newArrayList();
- String location = null;
- String comment = null;
- if(msDb != null) {
- location = msDb.getLocationUri();
- comment = msDb.getDescription();
- }
-
- TColumnValue dbNameCol = new TColumnValue();
- dbNameCol.setString_val(db.getName());
- TColumnValue dbLocationCol = new TColumnValue();
- dbLocationCol.setString_val(Objects.toString(location, ""));
- TColumnValue commentCol = new TColumnValue();
- commentCol.setString_val(Objects.toString(comment, ""));
- descResult.results.add(
- new TResultRow(Lists.newArrayList(dbNameCol, dbLocationCol, commentCol)));
- return descResult;
- }
-
- /*
- * Helper function used to build privilege results.
- */
- private static void buildPrivilegeResult(
- TDescribeResult descResult, Map<String, List<PrivilegeGrantInfo>> privilegeMap) {
- if (privilegeMap == null) return;
-
- for (Map.Entry<String, List<PrivilegeGrantInfo>> privilegeEntry:
- privilegeMap.entrySet()) {
- TColumnValue title = new TColumnValue();
- title.setString_val("Privileges for " + privilegeEntry.getKey() + ": ");
- descResult.results.add(
- new TResultRow(Lists.newArrayList(title, EMPTY, EMPTY)));
- for (PrivilegeGrantInfo privilegeInfo: privilegeEntry.getValue()) {
- TColumnValue privilege = new TColumnValue();
- privilege.setString_val(
- privilegeInfo.getPrivilege() + " " + privilegeInfo.isGrantOption());
- TColumnValue grantor = new TColumnValue();
- grantor.setString_val(
- privilegeInfo.getGrantor() + " " + privilegeInfo.getGrantorType());
- TColumnValue grantTime = new TColumnValue();
- grantTime.setString_val(privilegeInfo.getCreateTime() + "");
- descResult.results.add(
- new TResultRow(Lists.newArrayList(privilege, grantor, grantTime)));
- }
- }
- }
-
- /*
- * Builds a TDescribeResult that contains the result of a DESCRIBE FORMATTED|EXTENDED
- * DATABASE <db> command. Output all the database's properties.
- */
- private static TDescribeResult describeDbExtended(Db db) {
- TDescribeResult descResult = describeDbMinimal(db);
- org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
- String ownerName = null;
- PrincipalType ownerType = null;
- Map<String, String> params = null;
- PrincipalPrivilegeSet privileges = null;
- if(msDb != null) {
- ownerName = msDb.getOwnerName();
- ownerType = msDb.getOwnerType();
- params = msDb.getParameters();
- privileges = msDb.getPrivileges();
- }
-
- if (ownerName != null && ownerType != null) {
- TColumnValue owner = new TColumnValue();
- owner.setString_val("Owner: ");
- TResultRow ownerRow =
- new TResultRow(Lists.newArrayList(owner, EMPTY, EMPTY));
- descResult.results.add(ownerRow);
-
- TColumnValue ownerNameCol = new TColumnValue();
- ownerNameCol.setString_val(Objects.toString(ownerName, ""));
- TColumnValue ownerTypeCol = new TColumnValue();
- ownerTypeCol.setString_val(Objects.toString(ownerType, ""));
- descResult.results.add(
- new TResultRow(Lists.newArrayList(EMPTY, ownerNameCol, ownerTypeCol)));
- }
-
- if (params != null && params.size() > 0) {
- TColumnValue parameter = new TColumnValue();
- parameter.setString_val("Parameter: ");
- TResultRow parameterRow =
- new TResultRow(Lists.newArrayList(parameter, EMPTY, EMPTY));
- descResult.results.add(parameterRow);
- for (Map.Entry<String, String> param: params.entrySet()) {
- TColumnValue key = new TColumnValue();
- key.setString_val(Objects.toString(param.getKey(), ""));
- TColumnValue val = new TColumnValue();
- val.setString_val(Objects.toString(param.getValue(), ""));
- descResult.results.add(
- new TResultRow(Lists.newArrayList(EMPTY, key, val)));
- }
- }
-
- // Currently we only retrieve privileges stored in hive metastore.
- // TODO: Retrieve privileges from Catalog
- if (privileges != null) {
- buildPrivilegeResult(descResult, privileges.getUserPrivileges());
- buildPrivilegeResult(descResult, privileges.getGroupPrivileges());
- buildPrivilegeResult(descResult, privileges.getRolePrivileges());
- }
- return descResult;
- }
-
- /*
- * Builds a TDescribeResult that contains the result of a DESCRIBE FORMATTED|EXTENDED
- * <table> command. For the formatted describe output the goal is to be exactly the
- * same as what Hive (via HiveServer2) outputs, for compatibility reasons. To do this,
- * Hive's MetadataFormatUtils class is used to build the results.
- */
- public static TDescribeResult buildDescribeFormattedResult(Table table) {
- TDescribeResult descResult = new TDescribeResult();
- descResult.results = Lists.newArrayList();
-
- org.apache.hadoop.hive.metastore.api.Table msTable =
- table.getMetaStoreTable().deepCopy();
- // For some table formats (e.g. Avro) the column list in the table can differ from the
- // one returned by the Hive metastore. To handle this we use the column list from the
- // table which has already reconciled those differences.
- msTable.getSd().setCols(Column.toFieldSchemas(table.getNonClusteringColumns()));
- msTable.setPartitionKeys(Column.toFieldSchemas(table.getClusteringColumns()));
-
- // To avoid initializing any of the SerDe classes in the metastore table Thrift
- // struct, create the ql.metadata.Table object by calling the empty c'tor and
- // then calling setTTable().
- org.apache.hadoop.hive.ql.metadata.Table hiveTable =
- new org.apache.hadoop.hive.ql.metadata.Table();
- hiveTable.setTTable(msTable);
- StringBuilder sb = new StringBuilder();
- // First add all the columns (includes partition columns).
- sb.append(MetaDataFormatUtils.getAllColumnsInformation(msTable.getSd().getCols(),
- msTable.getPartitionKeys(), true, false, true));
- // Add the extended table metadata information.
- sb.append(MetaDataFormatUtils.getTableInformation(hiveTable));
-
- for (String line: sb.toString().split("\n")) {
- // To match Hive's HiveServer2 output, split each line into multiple column
- // values based on the field delimiter.
- String[] columns = line.split(MetaDataFormatUtils.FIELD_DELIM);
- TResultRow resultRow = new TResultRow();
- for (int i = 0; i < NUM_DESC_FORMATTED_RESULT_COLS; ++i) {
- TColumnValue colVal = new TColumnValue();
- colVal.setString_val(null);
- if (columns.length > i) {
- // Add the column value.
- colVal.setString_val(columns[i]);
- }
- resultRow.addToColVals(colVal);
- }
- descResult.results.add(resultRow);
- }
- return descResult;
- }
-
- /*
- * Builds a TDescribeResult that contains the result of a DESCRIBE <path> command:
- * the names and types of fields of the table or complex type referred to by the path.
- */
- public static TDescribeResult buildDescribeMinimalResult(StructType type) {
- TDescribeResult descResult = new TDescribeResult();
- descResult.results = Lists.newArrayList();
-
- for (StructField field: type.getFields()) {
- TColumnValue colNameCol = new TColumnValue();
- colNameCol.setString_val(field.getName());
- TColumnValue dataTypeCol = new TColumnValue();
- dataTypeCol.setString_val(field.getType().prettyPrint().toLowerCase());
- TColumnValue commentCol = new TColumnValue();
- commentCol.setString_val(field.getComment() != null ? field.getComment() : "");
- descResult.results.add(
- new TResultRow(Lists.newArrayList(colNameCol, dataTypeCol, commentCol)));
- }
- return descResult;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/FeSupport.java b/fe/src/main/java/com/cloudera/impala/service/FeSupport.java
deleted file mode 100644
index 4014129..0000000
--- a/fe/src/main/java/com/cloudera/impala/service/FeSupport.java
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.service;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.BoolLiteral;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.NullLiteral;
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.thrift.TCacheJarParams;
-import com.cloudera.impala.thrift.TCacheJarResult;
-import com.cloudera.impala.thrift.TCatalogObject;
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TCatalogServiceRequestHeader;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TExprBatch;
-import com.cloudera.impala.thrift.TPrioritizeLoadRequest;
-import com.cloudera.impala.thrift.TPrioritizeLoadResponse;
-import com.cloudera.impala.thrift.TQueryCtx;
-import com.cloudera.impala.thrift.TResultRow;
-import com.cloudera.impala.thrift.TStatus;
-import com.cloudera.impala.thrift.TStartupOptions;
-import com.cloudera.impala.thrift.TSymbolLookupParams;
-import com.cloudera.impala.thrift.TSymbolLookupResult;
-import com.cloudera.impala.thrift.TTable;
-import com.cloudera.impala.util.NativeLibUtil;
-import com.google.common.base.Preconditions;
-
-/**
- * This class provides the Impala executor functionality to the FE.
- * fe-support.cc implements all the native calls.
- * If the planner is executed inside Impalad, Impalad would have registered all the JNI
- * native functions already. There's no need to load the shared library.
- * For unit test (mvn test), load the shared library because the native function has not
- * been loaded yet.
- */
-public class FeSupport {
- private final static Logger LOG = LoggerFactory.getLogger(FeSupport.class);
- private static boolean loaded_ = false;
-
- // Only called if this library is explicitly loaded. This only happens
- // when running FE tests.
- public native static void NativeFeTestInit();
-
- // Returns a serialized TResultRow
- public native static byte[] NativeEvalConstExprs(byte[] thriftExprBatch,
- byte[] thriftQueryGlobals);
-
- // Returns a serialized TSymbolLookupResult
- public native static byte[] NativeLookupSymbol(byte[] thriftSymbolLookup);
-
- // Returns a serialized TCacheJarResult
- public native static byte[] NativeCacheJar(byte[] thriftCacheJar);
-
- // Does an RPCs to the Catalog Server to prioritize the metadata loading of a
- // one or more catalog objects. To keep our kerberos configuration consolidated,
- // we make make all RPCs in the BE layer instead of calling the Catalog Server
- // using Java Thrift bindings.
- public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
-
- // Return select BE startup options as a serialized TStartupOptions
- public native static byte[] NativeGetStartupOptions();
-
- /**
- * Locally caches the jar at the specified HDFS location.
- *
- * @param hdfsLocation The path to the jar in HDFS
- * @return The result of the call to cache the jar, includes a status and the local
- * path of the cached jar if the operation was successful.
- */
- public static TCacheJarResult CacheJar(String hdfsLocation) throws InternalException {
- Preconditions.checkNotNull(hdfsLocation);
- TCacheJarParams params = new TCacheJarParams(hdfsLocation);
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- byte[] result;
- try {
- result = CacheJar(serializer.serialize(params));
- Preconditions.checkNotNull(result);
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- TCacheJarResult thriftResult = new TCacheJarResult();
- deserializer.deserialize(thriftResult, result);
- return thriftResult;
- } catch (TException e) {
- // this should never happen
- throw new InternalException(
- "Couldn't cache jar at HDFS location " + hdfsLocation, e);
- }
- }
-
- private static byte[] CacheJar(byte[] thriftParams) {
- try {
- return NativeCacheJar(thriftParams);
- } catch (UnsatisfiedLinkError e) {
- loadLibrary();
- }
- return NativeCacheJar(thriftParams);
- }
-
- public static TColumnValue EvalConstExpr(Expr expr, TQueryCtx queryCtx)
- throws InternalException {
- Preconditions.checkState(expr.isConstant());
- TExprBatch exprBatch = new TExprBatch();
- exprBatch.addToExprs(expr.treeToThrift());
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- byte[] result;
- try {
- result = EvalConstExprs(serializer.serialize(exprBatch),
- serializer.serialize(queryCtx));
- Preconditions.checkNotNull(result);
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- TResultRow val = new TResultRow();
- deserializer.deserialize(val, result);
- Preconditions.checkState(val.getColValsSize() == 1);
- return val.getColVals().get(0);
- } catch (TException e) {
- // this should never happen
- throw new InternalException("couldn't execute expr " + expr.toSql(), e);
- }
- }
-
- private static byte[] LookupSymbol(byte[] thriftParams) {
- try {
- return NativeLookupSymbol(thriftParams);
- } catch (UnsatisfiedLinkError e) {
- loadLibrary();
- }
- return NativeLookupSymbol(thriftParams);
- }
-
- public static TSymbolLookupResult LookupSymbol(TSymbolLookupParams params)
- throws InternalException {
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try {
- byte[] resultBytes = LookupSymbol(serializer.serialize(params));
- Preconditions.checkNotNull(resultBytes);
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- TSymbolLookupResult result = new TSymbolLookupResult();
- deserializer.deserialize(result, resultBytes);
- return result;
- } catch (TException e) {
- // this should never happen
- throw new InternalException("couldn't perform symbol lookup.", e);
- }
- }
-
- private static byte[] EvalConstExprs(byte[] thriftExprBatch,
- byte[] thriftQueryContext) {
- try {
- return NativeEvalConstExprs(thriftExprBatch, thriftQueryContext);
- } catch (UnsatisfiedLinkError e) {
- loadLibrary();
- }
- return NativeEvalConstExprs(thriftExprBatch, thriftQueryContext);
- }
-
- public static boolean EvalPredicate(Expr pred, TQueryCtx queryCtx)
- throws InternalException {
- // Shortcuts to avoid expensive BE evaluation.
- if (pred instanceof BoolLiteral) return ((BoolLiteral) pred).getValue();
- if (pred instanceof NullLiteral) return false;
- Preconditions.checkState(pred.getType().isBoolean());
- TColumnValue val = EvalConstExpr(pred, queryCtx);
- // Return false if pred evaluated to false or NULL. True otherwise.
- return val.isBool_val() && val.bool_val;
- }
-
- /**
- * Evaluate a batch of predicates in the BE. The results are stored in a
- * TResultRow object, where each TColumnValue in it stores the result of
- * a predicate evaluation.
- *
- * TODO: This function is currently used for improving the performance of
- * partition pruning (see IMPALA-887), hence it only supports boolean
- * exprs. In the future, we can extend it to support arbitrary constant exprs.
- */
- public static TResultRow EvalPredicateBatch(ArrayList<Expr> exprs,
- TQueryCtx queryCtx) throws InternalException {
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- TExprBatch exprBatch = new TExprBatch();
- for (Expr expr: exprs) {
- // Make sure we only process boolean exprs.
- Preconditions.checkState(expr.getType().isBoolean());
- Preconditions.checkState(expr.isConstant());
- exprBatch.addToExprs(expr.treeToThrift());
- }
- byte[] result;
- try {
- result = EvalConstExprs(serializer.serialize(exprBatch),
- serializer.serialize(queryCtx));
- Preconditions.checkNotNull(result);
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- TResultRow val = new TResultRow();
- deserializer.deserialize(val, result);
- return val;
- } catch (TException e) {
- // this should never happen
- throw new InternalException("couldn't execute a batch of exprs.", e);
- }
- }
-
- private static byte[] PrioritizeLoad(byte[] thriftReq) {
- try {
- return NativePrioritizeLoad(thriftReq);
- } catch (UnsatisfiedLinkError e) {
- loadLibrary();
- }
- return NativePrioritizeLoad(thriftReq);
- }
-
- public static TStatus PrioritizeLoad(Set<TableName> tableNames)
- throws InternalException {
- Preconditions.checkNotNull(tableNames);
-
- List<TCatalogObject> objectDescs = new ArrayList<TCatalogObject>(tableNames.size());
- for (TableName tableName: tableNames) {
- TCatalogObject catalogObject = new TCatalogObject();
- catalogObject.setType(TCatalogObjectType.TABLE);
- catalogObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
- objectDescs.add(catalogObject);
- }
-
- TPrioritizeLoadRequest request = new TPrioritizeLoadRequest ();
- request.setHeader(new TCatalogServiceRequestHeader());
- request.setObject_descs(objectDescs);
-
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try {
- byte[] result = PrioritizeLoad(serializer.serialize(request));
- Preconditions.checkNotNull(result);
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- TPrioritizeLoadResponse response = new TPrioritizeLoadResponse();
- deserializer.deserialize(response, result);
- return response.getStatus();
- } catch (TException e) {
- // this should never happen
- throw new InternalException("Error processing request: " + e.getMessage(), e);
- }
- }
-
- public static TStartupOptions GetStartupOptions() throws InternalException {
- try {
- byte[] result = NativeGetStartupOptions();
- Preconditions.checkNotNull(result);
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- TStartupOptions options = new TStartupOptions();
- deserializer.deserialize(options, result);
- return options;
- } catch (TException e) {
- throw new InternalException("Error retrieving startup options: " + e.getMessage(),
- e);
- }
- }
-
- /**
- * This function should only be called explicitly by the FeSupport to ensure that
- * native functions are loaded.
- */
- private static synchronized void loadLibrary() {
- if (loaded_) return;
- LOG.info("Loading libfesupport.so");
- NativeLibUtil.loadLibrary("libfesupport.so");
- LOG.info("Loaded libfesupport.so");
- loaded_ = true;
- NativeFeTestInit();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
deleted file mode 100644
index 2d38396..0000000
--- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java
+++ /dev/null
@@ -1,1231 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.service;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.cloudera.impala.catalog.KuduTable;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hive.service.cli.thrift.TGetColumnsReq;
-import org.apache.hive.service.cli.thrift.TGetFunctionsReq;
-import org.apache.hive.service.cli.thrift.TGetSchemasReq;
-import org.apache.hive.service.cli.thrift.TGetTablesReq;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AnalysisContext;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.CreateDataSrcStmt;
-import com.cloudera.impala.analysis.CreateDropRoleStmt;
-import com.cloudera.impala.analysis.CreateUdaStmt;
-import com.cloudera.impala.analysis.CreateUdfStmt;
-import com.cloudera.impala.analysis.DropDataSrcStmt;
-import com.cloudera.impala.analysis.DropFunctionStmt;
-import com.cloudera.impala.analysis.DropStatsStmt;
-import com.cloudera.impala.analysis.DropTableOrViewStmt;
-import com.cloudera.impala.analysis.GrantRevokePrivStmt;
-import com.cloudera.impala.analysis.GrantRevokeRoleStmt;
-import com.cloudera.impala.analysis.InsertStmt;
-import com.cloudera.impala.analysis.QueryStmt;
-import com.cloudera.impala.analysis.ResetMetadataStmt;
-import com.cloudera.impala.analysis.ShowFunctionsStmt;
-import com.cloudera.impala.analysis.ShowGrantRoleStmt;
-import com.cloudera.impala.analysis.ShowRolesStmt;
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.analysis.TruncateStmt;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.authorization.AuthorizationChecker;
-import com.cloudera.impala.authorization.AuthorizationConfig;
-import com.cloudera.impala.authorization.ImpalaInternalAdminUser;
-import com.cloudera.impala.authorization.PrivilegeRequest;
-import com.cloudera.impala.authorization.PrivilegeRequestBuilder;
-import com.cloudera.impala.authorization.User;
-import com.cloudera.impala.catalog.AuthorizationException;
-import com.cloudera.impala.catalog.Catalog;
-import com.cloudera.impala.catalog.CatalogException;
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.DataSource;
-import com.cloudera.impala.catalog.DataSourceTable;
-import com.cloudera.impala.catalog.DatabaseNotFoundException;
-import com.cloudera.impala.catalog.Db;
-import com.cloudera.impala.catalog.Function;
-import com.cloudera.impala.catalog.HBaseTable;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.ImpaladCatalog;
-import com.cloudera.impala.catalog.StructType;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.catalog.TableId;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.common.NotImplementedException;
-import com.cloudera.impala.common.RuntimeEnv;
-import com.cloudera.impala.planner.PlanFragment;
-import com.cloudera.impala.planner.Planner;
-import com.cloudera.impala.planner.ScanNode;
-import com.cloudera.impala.thrift.TCatalogOpRequest;
-import com.cloudera.impala.thrift.TCatalogOpType;
-import com.cloudera.impala.thrift.TCatalogServiceRequestHeader;
-import com.cloudera.impala.thrift.TColumn;
-import com.cloudera.impala.thrift.TColumnType;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TCreateDropRoleParams;
-import com.cloudera.impala.thrift.TDdlExecRequest;
-import com.cloudera.impala.thrift.TDdlType;
-import com.cloudera.impala.thrift.TDescribeOutputStyle;
-import com.cloudera.impala.thrift.TDescribeResult;
-import com.cloudera.impala.thrift.TErrorCode;
-import com.cloudera.impala.thrift.TExecRequest;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TExplainResult;
-import com.cloudera.impala.thrift.TFinalizeParams;
-import com.cloudera.impala.thrift.TFunctionCategory;
-import com.cloudera.impala.thrift.TGrantRevokePrivParams;
-import com.cloudera.impala.thrift.TGrantRevokeRoleParams;
-import com.cloudera.impala.thrift.TLineageGraph;
-import com.cloudera.impala.thrift.TLoadDataReq;
-import com.cloudera.impala.thrift.TLoadDataResp;
-import com.cloudera.impala.thrift.TMetadataOpRequest;
-import com.cloudera.impala.thrift.TPlanFragment;
-import com.cloudera.impala.thrift.TPlanFragmentTree;
-import com.cloudera.impala.thrift.TQueryCtx;
-import com.cloudera.impala.thrift.TQueryExecRequest;
-import com.cloudera.impala.thrift.TResetMetadataRequest;
-import com.cloudera.impala.thrift.TResultRow;
-import com.cloudera.impala.thrift.TResultSet;
-import com.cloudera.impala.thrift.TResultSetMetadata;
-import com.cloudera.impala.thrift.TShowFilesParams;
-import com.cloudera.impala.thrift.TStatus;
-import com.cloudera.impala.thrift.TStmtType;
-import com.cloudera.impala.thrift.TTableName;
-import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
-import com.cloudera.impala.thrift.TUpdateCatalogCacheResponse;
-import com.cloudera.impala.thrift.TUpdateMembershipRequest;
-import com.cloudera.impala.util.EventSequence;
-import com.cloudera.impala.util.MembershipSnapshot;
-import com.cloudera.impala.util.PatternMatcher;
-import com.cloudera.impala.util.TResultRowBuilder;
-import com.cloudera.impala.util.TSessionStateUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Frontend API for the impalad process.
- * This class allows the impala daemon to create TQueryExecRequest
- * in response to TClientRequests. Also handles management of the authorization
- * policy.
- */
-public class Frontend {
- private final static Logger LOG = LoggerFactory.getLogger(Frontend.class);
- // Time to wait for missing tables to be loaded before timing out.
- private final long MISSING_TBL_LOAD_WAIT_TIMEOUT_MS = 2 * 60 * 1000;
-
- // Max time to wait for a catalog update notification.
- private final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000;
-
- //TODO: Make the reload interval configurable.
- private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60;
-
- private ImpaladCatalog impaladCatalog_;
- private final AuthorizationConfig authzConfig_;
- private final AtomicReference<AuthorizationChecker> authzChecker_;
- private final ScheduledExecutorService policyReader_ =
- Executors.newScheduledThreadPool(1);
-
- public Frontend(AuthorizationConfig authorizationConfig) {
- this(authorizationConfig, new ImpaladCatalog());
- }
-
- /**
- * C'tor used by tests to pass in a custom ImpaladCatalog.
- */
- public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) {
- authzConfig_ = authorizationConfig;
- impaladCatalog_ = catalog;
- authzChecker_ = new AtomicReference<AuthorizationChecker>(
- new AuthorizationChecker(authzConfig_, impaladCatalog_.getAuthPolicy()));
- // If authorization is enabled, reload the policy on a regular basis.
- if (authzConfig_.isEnabled() && authzConfig_.isFileBasedPolicy()) {
- // Stagger the reads across nodes
- Random randomGen = new Random(UUID.randomUUID().hashCode());
- int delay = AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS + randomGen.nextInt(60);
-
- policyReader_.scheduleAtFixedRate(
- new AuthorizationPolicyReader(authzConfig_),
- delay, AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS, TimeUnit.SECONDS);
- }
- }
-
- /**
- * Reads (and caches) an authorization policy from HDFS.
- */
- private class AuthorizationPolicyReader implements Runnable {
- private final AuthorizationConfig config_;
-
- public AuthorizationPolicyReader(AuthorizationConfig config) {
- config_ = config;
- }
-
- @Override
- public void run() {
- try {
- LOG.info("Reloading authorization policy file from: " + config_.getPolicyFile());
- authzChecker_.set(new AuthorizationChecker(config_,
- getCatalog().getAuthPolicy()));
- } catch (Exception e) {
- LOG.error("Error reloading policy file: ", e);
- }
- }
- }
-
- public ImpaladCatalog getCatalog() { return impaladCatalog_; }
- public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
-
- public TUpdateCatalogCacheResponse updateCatalogCache(
- TUpdateCatalogCacheRequest req) throws CatalogException {
- ImpaladCatalog catalog = impaladCatalog_;
-
- // If this is not a delta, this update should replace the current
- // Catalog contents so create a new catalog and populate it.
- if (!req.is_delta) catalog = new ImpaladCatalog();
-
- TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
-
- if (!req.is_delta) {
- // This was not a delta update. Now that the catalog has been updated,
- // replace the references to impaladCatalog_/authzChecker_ ensure
- // clients continue don't see the catalog disappear.
- impaladCatalog_ = catalog;
- authzChecker_.set(new AuthorizationChecker(authzConfig_,
- impaladCatalog_.getAuthPolicy()));
- }
- return response;
- }
-
- /**
- * Update the cluster membership snapshot with the latest snapshot from the backend.
- */
- public void updateMembership(TUpdateMembershipRequest req) {
- MembershipSnapshot.update(req);
- }
-
- /**
- * Constructs a TCatalogOpRequest and attaches it, plus any metadata, to the
- * result argument.
- */
- private void createCatalogOpRequest(AnalysisContext.AnalysisResult analysis,
- TExecRequest result) throws InternalException {
- TCatalogOpRequest ddl = new TCatalogOpRequest();
- TResultSetMetadata metadata = new TResultSetMetadata();
- if (analysis.isUseStmt()) {
- ddl.op_type = TCatalogOpType.USE;
- ddl.setUse_db_params(analysis.getUseStmt().toThrift());
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isShowTablesStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_TABLES;
- ddl.setShow_tables_params(analysis.getShowTablesStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift())));
- } else if (analysis.isShowDbsStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_DBS;
- ddl.setShow_dbs_params(analysis.getShowDbsStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift()),
- new TColumn("comment", Type.STRING.toThrift())));
- } else if (analysis.isShowDataSrcsStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_DATA_SRCS;
- ddl.setShow_data_srcs_params(analysis.getShowDataSrcsStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift()),
- new TColumn("location", Type.STRING.toThrift()),
- new TColumn("class name", Type.STRING.toThrift()),
- new TColumn("api version", Type.STRING.toThrift())));
- } else if (analysis.isShowStatsStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_STATS;
- ddl.setShow_stats_params(analysis.getShowStatsStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift())));
- } else if (analysis.isShowFunctionsStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_FUNCTIONS;
- ShowFunctionsStmt stmt = (ShowFunctionsStmt)analysis.getStmt();
- ddl.setShow_fns_params(stmt.toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("return type", Type.STRING.toThrift()),
- new TColumn("signature", Type.STRING.toThrift()),
- new TColumn("binary type", Type.STRING.toThrift()),
- new TColumn("is persistent", Type.STRING.toThrift())));
- } else if (analysis.isShowCreateTableStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_CREATE_TABLE;
- ddl.setShow_create_table_params(analysis.getShowCreateTableStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("result", Type.STRING.toThrift())));
- } else if (analysis.isShowCreateFunctionStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_CREATE_FUNCTION;
- ddl.setShow_create_function_params(analysis.getShowCreateFunctionStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("result", Type.STRING.toThrift())));
- } else if (analysis.isShowFilesStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_FILES;
- ddl.setShow_files_params(analysis.getShowFilesStmt().toThrift());
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isDescribeDbStmt()) {
- ddl.op_type = TCatalogOpType.DESCRIBE_DB;
- ddl.setDescribe_db_params(analysis.getDescribeDbStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift()),
- new TColumn("location", Type.STRING.toThrift()),
- new TColumn("comment", Type.STRING.toThrift())));
- } else if (analysis.isDescribeTableStmt()) {
- ddl.op_type = TCatalogOpType.DESCRIBE_TABLE;
- ddl.setDescribe_table_params(analysis.getDescribeTableStmt().toThrift());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift()),
- new TColumn("type", Type.STRING.toThrift()),
- new TColumn("comment", Type.STRING.toThrift())));
- } else if (analysis.isAlterTableStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.ALTER_TABLE);
- req.setAlter_table_params(analysis.getAlterTableStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isAlterViewStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.ALTER_VIEW);
- req.setAlter_view_params(analysis.getAlterViewStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateTableStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_TABLE);
- req.setCreate_table_params(analysis.getCreateTableStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateTableAsSelectStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_TABLE_AS_SELECT);
- req.setCreate_table_params(
- analysis.getCreateTableAsSelectStmt().getCreateStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Arrays.asList(
- new TColumn("summary", Type.STRING.toThrift())));
- } else if (analysis.isCreateTableLikeStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_TABLE_LIKE);
- req.setCreate_table_like_params(analysis.getCreateTableLikeStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateViewStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_VIEW);
- req.setCreate_view_params(analysis.getCreateViewStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateDbStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_DATABASE);
- req.setCreate_db_params(analysis.getCreateDbStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateUdfStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- CreateUdfStmt stmt = (CreateUdfStmt) analysis.getStmt();
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_FUNCTION);
- req.setCreate_fn_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateUdaStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_FUNCTION);
- CreateUdaStmt stmt = (CreateUdaStmt)analysis.getStmt();
- req.setCreate_fn_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isCreateDataSrcStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.CREATE_DATA_SOURCE);
- CreateDataSrcStmt stmt = (CreateDataSrcStmt)analysis.getStmt();
- req.setCreate_data_source_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isComputeStatsStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.COMPUTE_STATS);
- req.setCompute_stats_params(analysis.getComputeStatsStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isDropDbStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.DROP_DATABASE);
- req.setDrop_db_params(analysis.getDropDbStmt().toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isDropTableOrViewStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- DropTableOrViewStmt stmt = analysis.getDropTableOrViewStmt();
- req.setDdl_type(stmt.isDropTable() ? TDdlType.DROP_TABLE : TDdlType.DROP_VIEW);
- req.setDrop_table_or_view_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isTruncateStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- TruncateStmt stmt = analysis.getTruncateStmt();
- req.setDdl_type(TDdlType.TRUNCATE_TABLE);
- req.setTruncate_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isDropFunctionStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.DROP_FUNCTION);
- DropFunctionStmt stmt = (DropFunctionStmt)analysis.getStmt();
- req.setDrop_fn_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isDropDataSrcStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.DROP_DATA_SOURCE);
- DropDataSrcStmt stmt = (DropDataSrcStmt)analysis.getStmt();
- req.setDrop_data_source_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isDropStatsStmt()) {
- ddl.op_type = TCatalogOpType.DDL;
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(TDdlType.DROP_STATS);
- DropStatsStmt stmt = (DropStatsStmt) analysis.getStmt();
- req.setDrop_stats_params(stmt.toThrift());
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isResetMetadataStmt()) {
- ddl.op_type = TCatalogOpType.RESET_METADATA;
- ResetMetadataStmt resetMetadataStmt = (ResetMetadataStmt) analysis.getStmt();
- TResetMetadataRequest req = resetMetadataStmt.toThrift();
- ddl.setReset_metadata_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isShowRolesStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_ROLES;
- ShowRolesStmt showRolesStmt = (ShowRolesStmt) analysis.getStmt();
- ddl.setShow_roles_params(showRolesStmt.toThrift());
- Set<String> groupNames =
- getAuthzChecker().getUserGroups(analysis.getAnalyzer().getUser());
- // Check if the user is part of the group (case-sensitive) this SHOW ROLE
- // statement is targeting. If they are already a member of the group,
- // the admin requirement can be removed.
- Preconditions.checkState(ddl.getShow_roles_params().isSetIs_admin_op());
- if (ddl.getShow_roles_params().isSetGrant_group() &&
- groupNames.contains(ddl.getShow_roles_params().getGrant_group())) {
- ddl.getShow_roles_params().setIs_admin_op(false);
- }
- metadata.setColumns(Arrays.asList(
- new TColumn("role_name", Type.STRING.toThrift())));
- } else if (analysis.isShowGrantRoleStmt()) {
- ddl.op_type = TCatalogOpType.SHOW_GRANT_ROLE;
- ShowGrantRoleStmt showGrantRoleStmt = (ShowGrantRoleStmt) analysis.getStmt();
- ddl.setShow_grant_role_params(showGrantRoleStmt.toThrift());
- Set<String> groupNames =
- getAuthzChecker().getUserGroups(analysis.getAnalyzer().getUser());
- // User must be an admin to execute this operation if they have not been granted
- // this role.
- ddl.getShow_grant_role_params().setIs_admin_op(Sets.intersection(groupNames,
- showGrantRoleStmt.getRole().getGrantGroups()).isEmpty());
- metadata.setColumns(Arrays.asList(
- new TColumn("name", Type.STRING.toThrift())));
- } else if (analysis.isCreateDropRoleStmt()) {
- CreateDropRoleStmt createDropRoleStmt = (CreateDropRoleStmt) analysis.getStmt();
- TCreateDropRoleParams params = createDropRoleStmt.toThrift();
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(params.isIs_drop() ? TDdlType.DROP_ROLE : TDdlType.CREATE_ROLE);
- req.setCreate_drop_role_params(params);
- ddl.op_type = TCatalogOpType.DDL;
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isGrantRevokeRoleStmt()) {
- GrantRevokeRoleStmt grantRoleStmt = (GrantRevokeRoleStmt) analysis.getStmt();
- TGrantRevokeRoleParams params = grantRoleStmt.toThrift();
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(params.isIs_grant() ? TDdlType.GRANT_ROLE : TDdlType.REVOKE_ROLE);
- req.setGrant_revoke_role_params(params);
- ddl.op_type = TCatalogOpType.DDL;
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else if (analysis.isGrantRevokePrivStmt()) {
- GrantRevokePrivStmt grantRevokePrivStmt = (GrantRevokePrivStmt) analysis.getStmt();
- TGrantRevokePrivParams params = grantRevokePrivStmt.toThrift();
- TDdlExecRequest req = new TDdlExecRequest();
- req.setDdl_type(params.isIs_grant() ?
- TDdlType.GRANT_PRIVILEGE : TDdlType.REVOKE_PRIVILEGE);
- req.setGrant_revoke_priv_params(params);
- ddl.op_type = TCatalogOpType.DDL;
- ddl.setDdl_params(req);
- metadata.setColumns(Collections.<TColumn>emptyList());
- } else {
- throw new IllegalStateException("Unexpected CatalogOp statement type.");
- }
-
- result.setResult_set_metadata(metadata);
- result.setCatalog_op_request(ddl);
- if (ddl.getOp_type() == TCatalogOpType.DDL) {
- TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
- header.setRequesting_user(analysis.getAnalyzer().getUser().getName());
- ddl.getDdl_params().setHeader(header);
- }
- }
-
- /**
- * Loads a table or partition with one or more data files. If the "overwrite" flag
- * in the request is true, all existing data in the table/partition will be replaced.
- * If the "overwrite" flag is false, the files will be added alongside any existing
- * data files.
- */
- public TLoadDataResp loadTableData(TLoadDataReq request) throws ImpalaException,
- IOException {
- TableName tableName = TableName.fromThrift(request.getTable_name());
-
- // Get the destination for the load. If the load is targeting a partition,
- // this the partition location. Otherwise this is the table location.
- String destPathString = null;
- if (request.isSetPartition_spec()) {
- destPathString = impaladCatalog_.getHdfsPartition(tableName.getDb(),
- tableName.getTbl(), request.getPartition_spec()).getLocation();
- } else {
- destPathString = impaladCatalog_.getTable(tableName.getDb(), tableName.getTbl())
- .getMetaStoreTable().getSd().getLocation();
- }
-
- Path destPath = new Path(destPathString);
- Path sourcePath = new Path(request.source_path);
- FileSystem destFs = destPath.getFileSystem(FileSystemUtil.getConfiguration());
- FileSystem sourceFs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
-
- // Create a temporary directory within the final destination directory to stage the
- // file move.
- Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath);
-
- int filesLoaded = 0;
- if (sourceFs.isDirectory(sourcePath)) {
- filesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, tmpDestPath);
- } else {
- FileSystemUtil.relocateFile(sourcePath, tmpDestPath, true);
- filesLoaded = 1;
- }
-
- // If this is an OVERWRITE, delete all files in the destination.
- if (request.isOverwrite()) {
- FileSystemUtil.deleteAllVisibleFiles(destPath);
- }
-
- // Move the files from the temporary location to the final destination.
- FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath);
- // Cleanup the tmp directory.
- destFs.delete(tmpDestPath, true);
- TLoadDataResp response = new TLoadDataResp();
- TColumnValue col = new TColumnValue();
- String loadMsg = String.format(
- "Loaded %d file(s). Total files in destination location: %d",
- filesLoaded, FileSystemUtil.getTotalNumVisibleFiles(destPath));
- col.setString_val(loadMsg);
- response.setLoad_summary(new TResultRow(Lists.newArrayList(col)));
- return response;
- }
-
- /**
- * Parses and plans a query in order to generate its explain string. This method does
- * not increase the query id counter.
- */
- public String getExplainString(TQueryCtx queryCtx) throws ImpalaException {
- StringBuilder stringBuilder = new StringBuilder();
- createExecRequest(queryCtx, stringBuilder);
- return stringBuilder.toString();
- }
-
- /**
- * Returns all tables in database 'dbName' that match the pattern of 'matcher' and are
- * accessible to 'user'.
- */
- public List<String> getTableNames(String dbName, PatternMatcher matcher,
- User user) throws ImpalaException {
- List<String> tblNames = impaladCatalog_.getTableNames(dbName, matcher);
- if (authzConfig_.isEnabled()) {
- Iterator<String> iter = tblNames.iterator();
- while (iter.hasNext()) {
- String tblName = iter.next();
- PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder()
- .any().onAnyColumn(dbName, tblName).toRequest();
- if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
- iter.remove();
- }
- }
- }
- return tblNames;
- }
-
- /**
- * Returns a list of columns of a table using 'matcher' and are accessible
- * to the given user.
- */
- public List<Column> getColumns(Table table, PatternMatcher matcher,
- User user) throws InternalException {
- Preconditions.checkNotNull(table);
- Preconditions.checkNotNull(matcher);
- List<Column> columns = Lists.newArrayList();
- for (Column column: table.getColumnsInHiveOrder()) {
- String colName = column.getName();
- if (!matcher.matches(colName)) continue;
- if (authzConfig_.isEnabled()) {
- PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder()
- .any().onColumn(table.getTableName().getDb(), table.getTableName().getTbl(),
- colName).toRequest();
- if (!authzChecker_.get().hasAccess(user, privilegeRequest)) continue;
- }
- columns.add(column);
- }
- return columns;
- }
-
- /**
- * Returns all databases in catalog cache that match the pattern of 'matcher' and are
- * accessible to 'user'.
- */
- public List<Db> getDbs(PatternMatcher matcher, User user)
- throws InternalException {
- List<Db> dbs = impaladCatalog_.getDbs(matcher);
- // If authorization is enabled, filter out the databases the user does not
- // have permissions on.
- if (authzConfig_.isEnabled()) {
- Iterator<Db> iter = dbs.iterator();
- while (iter.hasNext()) {
- Db db = iter.next();
- if (!isAccessibleToUser(db, user)) iter.remove();
- }
- }
- return dbs;
- }
-
- /**
- * Check whether database is accessible to given user.
- */
- private boolean isAccessibleToUser(Db db, User user)
- throws InternalException {
- if (db.getName().toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
- // Default DB should always be shown.
- return true;
- }
- PrivilegeRequest request = new PrivilegeRequestBuilder()
- .any().onAnyTable(db.getName()).toRequest();
- return authzChecker_.get().hasAccess(user, request);
- }
-
- /**
- * Returns all data sources that match the pattern. If pattern is null,
- * matches all data sources.
- */
- public List<DataSource> getDataSrcs(String pattern) {
- return impaladCatalog_.getDataSources(
- PatternMatcher.createHivePatternMatcher(pattern));
- }
-
- /**
- * Generate result set and schema for a SHOW COLUMN STATS command.
- */
- public TResultSet getColumnStats(String dbName, String tableName)
- throws ImpalaException {
- Table table = impaladCatalog_.getTable(dbName, tableName);
- TResultSet result = new TResultSet();
- TResultSetMetadata resultSchema = new TResultSetMetadata();
- result.setSchema(resultSchema);
- resultSchema.addToColumns(new TColumn("Column", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Type", Type.STRING.toThrift()));
- resultSchema.addToColumns(
- new TColumn("#Distinct Values", Type.BIGINT.toThrift()));
- resultSchema.addToColumns(new TColumn("#Nulls", Type.BIGINT.toThrift()));
- resultSchema.addToColumns(new TColumn("Max Size", Type.INT.toThrift()));
- resultSchema.addToColumns(new TColumn("Avg Size", Type.DOUBLE.toThrift()));
-
- for (Column c: table.getColumnsInHiveOrder()) {
- TResultRowBuilder rowBuilder = new TResultRowBuilder();
- // Add name, type, NDVs, numNulls, max size and avg size.
- rowBuilder.add(c.getName()).add(c.getType().toSql())
- .add(c.getStats().getNumDistinctValues()).add(c.getStats().getNumNulls())
- .add(c.getStats().getMaxSize()).add(c.getStats().getAvgSize());
- result.addToRows(rowBuilder.get());
- }
- return result;
- }
-
- /**
- * Generate result set and schema for a SHOW TABLE STATS command.
- */
- public TResultSet getTableStats(String dbName, String tableName)
- throws ImpalaException {
- Table table = impaladCatalog_.getTable(dbName, tableName);
- if (table instanceof HdfsTable) {
- return ((HdfsTable) table).getTableStats();
- } else if (table instanceof HBaseTable) {
- return ((HBaseTable) table).getTableStats();
- } else if (table instanceof DataSourceTable) {
- return ((DataSourceTable) table).getTableStats();
- } else if (table instanceof KuduTable) {
- return ((KuduTable) table).getTableStats();
- } else {
- throw new InternalException("Invalid table class: " + table.getClass());
- }
- }
-
- /**
- * Returns all function signatures that match the pattern. If pattern is null,
- * matches all functions. If exactMatch is true, treats fnPattern as a function
- * name instead of pattern and returns exact match only.
- */
- public List<Function> getFunctions(TFunctionCategory category,
- String dbName, String fnPattern, boolean exactMatch)
- throws DatabaseNotFoundException {
- Db db = impaladCatalog_.getDb(dbName);
- if (db == null) {
- throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
- }
- List<Function> fns;
- if (exactMatch) {
- Preconditions.checkNotNull(fnPattern, "Invalid function name");
- fns = db.getFunctions(category, fnPattern);
- } else {
- fns = db.getFunctions(
- category, PatternMatcher.createHivePatternMatcher(fnPattern));
- }
- Collections.sort(fns,
- new Comparator<Function>() {
- @Override
- public int compare(Function f1, Function f2) {
- return f1.signatureString().compareTo(f2.signatureString());
- }
- });
- return fns;
- }
-
- /**
- * Returns database metadata, in the specified database. Throws an exception if db is
- * not found or if there is an error loading the db metadata.
- */
- public TDescribeResult describeDb(String dbName, TDescribeOutputStyle outputStyle)
- throws ImpalaException {
- Db db = impaladCatalog_.getDb(dbName);
- return DescribeResultFactory.buildDescribeDbResult(db, outputStyle);
- }
-
- /**
- * Returns table metadata, such as the column descriptors, in the specified table.
- * Throws an exception if the table or db is not found or if there is an error loading
- * the table metadata.
- */
- public TDescribeResult describeTable(String dbName, String tableName,
- TDescribeOutputStyle outputStyle, TColumnType tResultStruct)
- throws ImpalaException {
- if (outputStyle == TDescribeOutputStyle.MINIMAL) {
- StructType resultStruct = (StructType)Type.fromThrift(tResultStruct);
- return DescribeResultFactory.buildDescribeMinimalResult(resultStruct);
- } else {
- Preconditions.checkArgument(outputStyle == TDescribeOutputStyle.FORMATTED ||
- outputStyle == TDescribeOutputStyle.EXTENDED);
- Table table = impaladCatalog_.getTable(dbName, tableName);
- return DescribeResultFactory.buildDescribeFormattedResult(table);
- }
- }
-
- /**
- * Given a set of table names, returns the set of table names that are missing
- * metadata (are not yet loaded).
- */
- private Set<TableName> getMissingTbls(Set<TableName> tableNames) {
- Set<TableName> missingTbls = new HashSet<TableName>();
- for (TableName tblName: tableNames) {
- Db db = getCatalog().getDb(tblName.getDb());
- if (db == null) continue;
- Table tbl = db.getTable(tblName.getTbl());
- if (tbl == null) continue;
- if (!tbl.isLoaded()) missingTbls.add(tblName);
- }
- return missingTbls;
- }
-
- /**
- * Requests the catalog server load the given set of tables and waits until
- * these tables show up in the local catalog, or the given timeout has been reached.
- * The timeout is specified in milliseconds, with a value <= 0 indicating no timeout.
- * The exact steps taken are:
- * 1) Collect the tables that are missing (not yet loaded locally).
- * 2) Make an RPC to the CatalogServer to prioritize the loading of these tables.
- * 3) Wait until the local catalog contains all missing tables by (re)checking the
- * catalog each time a new catalog update is received.
- *
- * Returns true if all missing tables were received before timing out and false if
- * the timeout was reached before all tables were received.
- */
- private boolean requestTblLoadAndWait(Set<TableName> requestedTbls, long timeoutMs)
- throws InternalException {
- Set<TableName> missingTbls = getMissingTbls(requestedTbls);
- // There are no missing tables, return and avoid making an RPC to the CatalogServer.
- if (missingTbls.isEmpty()) return true;
-
- // Call into the CatalogServer and request the required tables be loaded.
- LOG.info(String.format("Requesting prioritized load of table(s): %s",
- Joiner.on(", ").join(missingTbls)));
- TStatus status = FeSupport.PrioritizeLoad(missingTbls);
- if (status.getStatus_code() != TErrorCode.OK) {
- throw new InternalException("Error requesting prioritized load: " +
- Joiner.on("\n").join(status.getError_msgs()));
- }
-
- long startTimeMs = System.currentTimeMillis();
- // Wait until all the required tables are loaded in the Impalad's catalog cache.
- while (!missingTbls.isEmpty()) {
- // Check if the timeout has been reached.
- if (timeoutMs > 0 && System.currentTimeMillis() - startTimeMs > timeoutMs) {
- return false;
- }
-
- LOG.trace(String.format("Waiting for table(s) to complete loading: %s",
- Joiner.on(", ").join(missingTbls)));
- getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
- missingTbls = getMissingTbls(missingTbls);
- // TODO: Check for query cancellation here.
- }
- return true;
- }
-
- /**
- * Overload of requestTblLoadAndWait that uses the default timeout.
- */
- public boolean requestTblLoadAndWait(Set<TableName> requestedTbls)
- throws InternalException {
- return requestTblLoadAndWait(requestedTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS);
- }
-
- /**
- * Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
- * Authorizes all catalog object accesses and throws an AuthorizationException
- * if the user does not have privileges to access one or more objects.
- * If a statement fails analysis because table/view metadata was not loaded, an
- * RPC to the CatalogServer will be executed to request loading the missing metadata
- * and analysis will be restarted once the required tables have been loaded
- * in the local Impalad Catalog or the MISSING_TBL_LOAD_WAIT_TIMEOUT_MS timeout
- * is reached.
- * The goal of this timeout is not to analysis, but to restart the analysis/missing
- * table collection process. This helps ensure a statement never waits indefinitely
- * for a table to be loaded in event the table metadata was invalidated.
- * TODO: Also consider adding an overall timeout that fails analysis.
- */
- private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
- throws AnalysisException, InternalException, AuthorizationException {
- if (!impaladCatalog_.isReady()) {
- throw new AnalysisException("This Impala daemon is not ready to accept user " +
- "requests. Status: Waiting for catalog update from the StateStore.");
- }
-
- AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_, queryCtx,
- authzConfig_);
- LOG.debug("analyze query " + queryCtx.request.stmt);
-
- // Run analysis in a loop until it any of the following events occur:
- // 1) Analysis completes successfully.
- // 2) Analysis fails with an AnalysisException AND there are no missing tables.
- // 3) Analysis fails with an AuthorizationException.
- try {
- while (true) {
- try {
- analysisCtx.analyze(queryCtx.request.stmt);
- Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
- return analysisCtx.getAnalysisResult();
- } catch (AnalysisException e) {
- Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
- // Only re-throw the AnalysisException if there were no missing tables.
- if (missingTbls.isEmpty()) throw e;
-
- // Some tables/views were missing, request and wait for them to load.
- if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
- LOG.info(String.format("Missing tables were not received in %dms. Load " +
- "request will be retried.", MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
- }
- }
- }
- } finally {
- // Authorize all accesses.
- // AuthorizationExceptions must take precedence over any AnalysisException
- // that has been thrown, so perform the authorization first.
- analysisCtx.authorize(getAuthzChecker());
- }
- }
-
- /**
- * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
- */
- public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
- throws ImpalaException {
- // Analyze the statement
- AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
- EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
- timeline.markEvent("Analysis finished");
- Preconditions.checkNotNull(analysisResult.getStmt());
- TExecRequest result = new TExecRequest();
- result.setQuery_options(queryCtx.request.getQuery_options());
- result.setAccess_events(analysisResult.getAccessEvents());
- result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
-
- if (analysisResult.isCatalogOp()) {
- result.stmt_type = TStmtType.DDL;
- createCatalogOpRequest(analysisResult, result);
- TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
- if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
- result.catalog_op_request.setLineage_graph(thriftLineageGraph);
- }
- // All DDL operations except for CTAS are done with analysis at this point.
- if (!analysisResult.isCreateTableAsSelectStmt()) return result;
- } else if (analysisResult.isLoadDataStmt()) {
- result.stmt_type = TStmtType.LOAD;
- result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
- new TColumn("summary", Type.STRING.toThrift()))));
- result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
- return result;
- } else if (analysisResult.isSetStmt()) {
- result.stmt_type = TStmtType.SET;
- result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
- new TColumn("option", Type.STRING.toThrift()),
- new TColumn("value", Type.STRING.toThrift()))));
- result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
- return result;
- }
-
- // create TQueryExecRequest
- Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
- || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
- || analysisResult.isDeleteStmt());
-
- TQueryExecRequest queryExecRequest = new TQueryExecRequest();
- // create plan
- LOG.debug("create plan");
- Planner planner = new Planner(analysisResult, queryCtx);
- if (RuntimeEnv.INSTANCE.isTestEnv()
- && queryCtx.request.query_options.mt_num_cores > 0) {
- // TODO: this is just to be able to run tests; implement this
- List<PlanFragment> planRoots = planner.createParallelPlans();
- for (PlanFragment planRoot: planRoots) {
- TPlanFragmentTree thriftPlan = planRoot.treeToThrift();
- queryExecRequest.addToMt_plans(thriftPlan);
- }
- queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
- queryExecRequest.setQuery_ctx(queryCtx);
- explainString.append(planner.getExplainString(
- Lists.newArrayList(planRoots.get(0)), queryExecRequest,
- TExplainLevel.STANDARD));
- queryExecRequest.setQuery_plan(explainString.toString());
- result.setQuery_exec_request(queryExecRequest);
- return result;
- }
- ArrayList<PlanFragment> fragments = planner.createPlan();
-
- List<ScanNode> scanNodes = Lists.newArrayList();
- // map from fragment to its index in queryExecRequest.fragments; needed for
- // queryExecRequest.dest_fragment_idx
- Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
-
- for (int idx = 0; idx < fragments.size(); ++idx) {
- PlanFragment fragment = fragments.get(idx);
- Preconditions.checkNotNull(fragment.getPlanRoot());
- fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
- fragmentIdx.put(fragment, idx);
- }
-
- // set fragment destinations
- for (int i = 1; i < fragments.size(); ++i) {
- PlanFragment dest = fragments.get(i).getDestFragment();
- Integer idx = fragmentIdx.get(dest);
- Preconditions.checkState(idx != null);
- queryExecRequest.addToDest_fragment_idx(idx.intValue());
- }
-
- // Set scan ranges/locations for scan nodes.
- // Also assemble list of tables names missing stats for assembling a warning message.
- LOG.debug("get scan range locations");
- Set<TTableName> tablesMissingStats = Sets.newTreeSet();
- // Assemble a similar list for corrupt stats
- Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
- for (ScanNode scanNode: scanNodes) {
- queryExecRequest.putToPer_node_scan_ranges(
- scanNode.getId().asInt(),
- scanNode.getScanRangeLocations());
- if (scanNode.isTableMissingStats()) {
- tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
- }
- if (scanNode.hasCorruptTableStats()) {
- tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
- }
- }
-
- queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
- for (TTableName tableName: tablesMissingStats) {
- queryCtx.addToTables_missing_stats(tableName);
- }
- for (TTableName tableName: tablesWithCorruptStats) {
- queryCtx.addToTables_with_corrupt_stats(tableName);
- }
-
- // Optionally disable spilling in the backend. Allow spilling if there are plan hints
- // or if all tables have stats.
- if (queryCtx.request.query_options.isDisable_unsafe_spills()
- && !tablesMissingStats.isEmpty()
- && !analysisResult.getAnalyzer().hasPlanHints()) {
- queryCtx.setDisable_spilling(true);
- }
-
- // Compute resource requirements after scan range locations because the cost
- // estimates of scan nodes rely on them.
- try {
- planner.computeResourceReqs(fragments, true, queryExecRequest);
- } catch (Exception e) {
- // Turn exceptions into a warning to allow the query to execute.
- LOG.error("Failed to compute resource requirements for query\n" +
- queryCtx.request.getStmt(), e);
- }
-
- // The fragment at this point has all state set, serialize it to thrift.
- for (PlanFragment fragment: fragments) {
- TPlanFragment thriftFragment = fragment.toThrift();
- queryExecRequest.addToFragments(thriftFragment);
- }
-
- // Use EXTENDED by default for all non-explain statements.
- TExplainLevel explainLevel = TExplainLevel.EXTENDED;
- // Use the query option for explain stmts and tests (e.g., planner tests).
- if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
- explainLevel = queryCtx.request.query_options.getExplain_level();
- }
-
- // Global query parameters to be set in each TPlanExecRequest.
- queryExecRequest.setQuery_ctx(queryCtx);
-
- explainString.append(
- planner.getExplainString(fragments, queryExecRequest, explainLevel));
- queryExecRequest.setQuery_plan(explainString.toString());
- queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
-
- TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
- if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
- queryExecRequest.setLineage_graph(thriftLineageGraph);
- }
-
- if (analysisResult.isExplainStmt()) {
- // Return the EXPLAIN request
- createExplainRequest(explainString.toString(), result);
- return result;
- }
-
- result.setQuery_exec_request(queryExecRequest);
-
- if (analysisResult.isQueryStmt()) {
- // fill in the metadata
- LOG.debug("create result set metadata");
- result.stmt_type = TStmtType.QUERY;
- result.query_exec_request.stmt_type = result.stmt_type;
- TResultSetMetadata metadata = new TResultSetMetadata();
- QueryStmt queryStmt = analysisResult.getQueryStmt();
- int colCnt = queryStmt.getColLabels().size();
- for (int i = 0; i < colCnt; ++i) {
- TColumn colDesc = new TColumn();
- colDesc.columnName = queryStmt.getColLabels().get(i);
- colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
- metadata.addToColumns(colDesc);
- }
- result.setResult_set_metadata(metadata);
- } else if (analysisResult.isInsertStmt() ||
- analysisResult.isCreateTableAsSelectStmt()) {
- // For CTAS the overall TExecRequest statement type is DDL, but the
- // query_exec_request should be DML
- result.stmt_type =
- analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
- result.query_exec_request.stmt_type = TStmtType.DML;
-
- // create finalization params of insert stmt
- InsertStmt insertStmt = analysisResult.getInsertStmt();
- if (insertStmt.getTargetTable() instanceof HdfsTable) {
- TFinalizeParams finalizeParams = new TFinalizeParams();
- finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
- finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
- finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
- String db = insertStmt.getTargetTableName().getDb();
- finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
- HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
- finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
- finalizeParams.setStaging_dir(
- hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
- queryExecRequest.setFinalize_params(finalizeParams);
- }
- } else {
- Preconditions.checkState(analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt());
- result.stmt_type = TStmtType.DML;
- result.query_exec_request.stmt_type = TStmtType.DML;
- }
-
- validateTableIds(analysisResult.getAnalyzer(), result);
-
- timeline.markEvent("Planning finished");
- result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
- return result;
- }
-
- /**
- * Check that we don't have any duplicate table IDs (see IMPALA-1702).
- * To be removed when IMPALA-1702 is resolved.
- */
- private void validateTableIds(Analyzer analyzer, TExecRequest result)
- throws InternalException {
- Map<TableId, Table> tableIds = Maps.newHashMap();
- Collection<TupleDescriptor> tupleDescs = analyzer.getDescTbl().getTupleDescs();
- for (TupleDescriptor desc: tupleDescs) {
- // Skip if tuple descriptor did not come from materializing scan.
- if (!desc.isMaterialized()) continue;
- Table table = desc.getTable();
- if (table == null) continue;
- Table otherTable = tableIds.get(table.getId());
- if (otherTable == table) continue; // Same table referenced twice
- if (otherTable == null) {
- tableIds.put(table.getId(), table);
- continue;
- }
- LOG.error("Found duplicate table ID! id=" + table.getId() + "\ntable1=\n"
- + table.toTCatalogObject() + "\ntable2=\n" + otherTable.toTCatalogObject()
- + "\nexec_request=\n" + result);
- throw new InternalException("Query encountered invalid metadata, likely due to " +
- "IMPALA-1702. Please try rerunning the query.");
- }
- }
-
- /**
- * Attaches the explain result to the TExecRequest.
- */
- private void createExplainRequest(String explainString, TExecRequest result) {
- // update the metadata - one string column
- TColumn colDesc = new TColumn("Explain String", Type.STRING.toThrift());
- TResultSetMetadata metadata = new TResultSetMetadata(Lists.newArrayList(colDesc));
- result.setResult_set_metadata(metadata);
-
- // create the explain result set - split the explain string into one line per row
- String[] explainStringArray = explainString.toString().split("\n");
- TExplainResult explainResult = new TExplainResult();
- explainResult.results = Lists.newArrayList();
- for (int i = 0; i < explainStringArray.length; ++i) {
- TColumnValue col = new TColumnValue();
- col.setString_val(explainStringArray[i]);
- TResultRow row = new TResultRow(Lists.newArrayList(col));
- explainResult.results.add(row);
- }
- result.setExplain_result(explainResult);
- result.stmt_type = TStmtType.EXPLAIN;
- }
-
- /**
- * Executes a HiveServer2 metadata operation and returns a TResultSet
- */
- public TResultSet execHiveServer2MetadataOp(TMetadataOpRequest request)
- throws ImpalaException {
- User user = request.isSetSession() ?
- new User(TSessionStateUtil.getEffectiveUser(request.session)) :
- ImpalaInternalAdminUser.getInstance();
- switch (request.opcode) {
- case GET_TYPE_INFO: return MetadataOp.getTypeInfo();
- case GET_SCHEMAS:
- {
- TGetSchemasReq req = request.getGet_schemas_req();
- return MetadataOp.getSchemas(this, req.getCatalogName(),
- req.getSchemaName(), user);
- }
- case GET_TABLES:
- {
- TGetTablesReq req = request.getGet_tables_req();
- return MetadataOp.getTables(this, req.getCatalogName(),
- req.getSchemaName(), req.getTableName(), req.getTableTypes(), user);
- }
- case GET_COLUMNS:
- {
- TGetColumnsReq req = request.getGet_columns_req();
- return MetadataOp.getColumns(this, req.getCatalogName(),
- req.getSchemaName(), req.getTableName(), req.getColumnName(), user);
- }
- case GET_CATALOGS: return MetadataOp.getCatalogs();
- case GET_TABLE_TYPES: return MetadataOp.getTableTypes();
- case GET_FUNCTIONS:
- {
- TGetFunctionsReq req = request.getGet_functions_req();
- return MetadataOp.getFunctions(this, req.getCatalogName(),
- req.getSchemaName(), req.getFunctionName(), user);
- }
- default:
- throw new NotImplementedException(request.opcode + " has not been implemented.");
- }
- }
-
- /**
- * Returns all files info of a table or partition.
- */
- public TResultSet getTableFiles(TShowFilesParams request)
- throws ImpalaException{
- Table table = impaladCatalog_.getTable(request.getTable_name().getDb_name(),
- request.getTable_name().getTable_name());
- if (table instanceof HdfsTable) {
- return ((HdfsTable) table).getFiles(request.getPartition_spec());
- } else {
- throw new InternalException("SHOW FILES only supports Hdfs table. " +
- "Unsupported table class: " + table.getClass());
- }
- }
-}