You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:43 UTC
[26/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ScalarFunction.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ScalarFunction.java b/fe/src/main/java/com/cloudera/impala/catalog/ScalarFunction.java
deleted file mode 100644
index b921015..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/ScalarFunction.java
+++ /dev/null
@@ -1,295 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
-
-import com.cloudera.impala.analysis.FunctionName;
-import com.cloudera.impala.analysis.HdfsUri;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.hive.executor.UdfExecutor.JavaUdfDataType;
-import com.cloudera.impala.thrift.TFunction;
-import com.cloudera.impala.thrift.TFunctionBinaryType;
-import com.cloudera.impala.thrift.TScalarFunction;
-import com.cloudera.impala.thrift.TSymbolType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Internal representation of a scalar function.
- */
-public class ScalarFunction extends Function {
- // The name inside the binary at location_ that contains this particular
- // function. e.g. org.example.MyUdf.class.
- private String symbolName_;
- private String prepareFnSymbol_;
- private String closeFnSymbol_;
-
- public ScalarFunction(FunctionName fnName, ArrayList<Type> argTypes, Type retType,
- boolean hasVarArgs) {
- super(fnName, argTypes, retType, hasVarArgs);
- }
-
- public ScalarFunction(FunctionName fnName, List<Type> argTypes,
- Type retType, HdfsUri location, String symbolName, String initFnSymbol,
- String closeFnSymbol) {
- super(fnName, argTypes, retType, false);
- setLocation(location);
- setSymbolName(symbolName);
- setPrepareFnSymbol(initFnSymbol);
- setCloseFnSymbol(closeFnSymbol);
- }
-
- /**
- * Creates a builtin scalar function. This is a helper that wraps a few steps
- * into one call.
- */
- public static ScalarFunction createBuiltin(String name, ArrayList<Type> argTypes,
- boolean hasVarArgs, Type retType, String symbol,
- String prepareFnSymbol, String closeFnSymbol, boolean userVisible) {
- Preconditions.checkNotNull(symbol);
- ScalarFunction fn = new ScalarFunction(
- new FunctionName(Catalog.BUILTINS_DB, name), argTypes, retType, hasVarArgs);
- fn.setBinaryType(TFunctionBinaryType.BUILTIN);
- fn.setUserVisible(userVisible);
- fn.setIsPersistent(true);
- try {
- fn.symbolName_ = fn.lookupSymbol(symbol, TSymbolType.UDF_EVALUATE, null,
- fn.hasVarArgs(), fn.getArgs());
- } catch (AnalysisException e) {
- // This should never happen
- throw new RuntimeException("Builtin symbol '" + symbol + "'" + argTypes
- + " not found!", e);
- }
- if (prepareFnSymbol != null) {
- try {
- fn.prepareFnSymbol_ = fn.lookupSymbol(prepareFnSymbol, TSymbolType.UDF_PREPARE);
- } catch (AnalysisException e) {
- // This should never happen
- throw new RuntimeException(
- "Builtin symbol '" + prepareFnSymbol + "' not found!", e);
- }
- }
- if (closeFnSymbol != null) {
- try {
- fn.closeFnSymbol_ = fn.lookupSymbol(closeFnSymbol, TSymbolType.UDF_CLOSE);
- } catch (AnalysisException e) {
- // This should never happen
- throw new RuntimeException(
- "Builtin symbol '" + closeFnSymbol + "' not found!", e);
- }
- }
- return fn;
- }
-
- /**
- * Creates a Function object based on following inputs.
- * @param dbName Name of fn's database
- * @param fnName Name of the function
- * @param fnClass Function symbol name
- * @param fnArgs List of Class objects corresponding to the args of evaluate method
- * @param fnRetType Class corresponding to the return type of the evaluate method
- * @param hdfsUri URI of the jar holding the udf class.
- * @return Function object corresponding to the hive udf if the parameters are
- * compatible, null otherwise.
- */
- public static Function fromHiveFunction(String dbName, String fnName, String fnClass,
- Class<?>[] fnArgs, Class<?> fnRetType, String hdfsUri) {
- // Check if the return type and the method arguments are supported.
- // Currently we only support certain primitive types.
- JavaUdfDataType javaRetType = JavaUdfDataType.getType(fnRetType);
- if (javaRetType == JavaUdfDataType.INVALID_TYPE) return null;
- List<Type> fnArgsList = Lists.newArrayList();
- for (Class<?> argClass: fnArgs) {
- JavaUdfDataType javaUdfType = JavaUdfDataType.getType(argClass);
- if (javaUdfType == JavaUdfDataType.INVALID_TYPE) return null;
- fnArgsList.add(new ScalarType(
- PrimitiveType.fromThrift(javaUdfType.getPrimitiveType())));
- }
- ScalarType retType = new ScalarType(
- PrimitiveType.fromThrift(javaRetType.getPrimitiveType()));
- ScalarFunction fn = new ScalarFunction(new FunctionName(dbName, fnName), fnArgsList,
- retType, new HdfsUri(hdfsUri), fnClass, null, null);
- // We do not support varargs for Java UDFs, and neither does Hive.
- fn.setHasVarArgs(false);
- fn.setBinaryType(TFunctionBinaryType.JAVA);
- fn.setIsPersistent(true);
- return fn;
- }
-
- /**
- * Creates a Hive function object from 'this'. Returns null if 'this' is not
- * a Java UDF.
- */
- public org.apache.hadoop.hive.metastore.api.Function toHiveFunction() {
- if (getBinaryType() != TFunctionBinaryType.JAVA) return null;
- List<ResourceUri> resources = Lists.newArrayList(new ResourceUri(ResourceType.JAR,
- getLocation().toString()));
- return new org.apache.hadoop.hive.metastore.api.Function(functionName(), dbName(),
- symbolName_, "", PrincipalType.USER, (int) (System.currentTimeMillis() / 1000),
- FunctionType.JAVA, resources);
- }
-
- /**
- * Creates a builtin scalar operator function. This is a helper that wraps a few steps
- * into one call.
- * TODO: this needs to be kept in sync with what generates the be operator
- * implementations. (gen_functions.py). Is there a better way to coordinate this.
- */
- public static ScalarFunction createBuiltinOperator(String name,
- ArrayList<Type> argTypes, Type retType) {
- // Operators have a well defined symbol based on the function name and type.
- // Convert Add(TINYINT, TINYINT) --> Add_TinyIntVal_TinyIntVal
- String beFn = Character.toUpperCase(name.charAt(0)) + name.substring(1);
- boolean usesDecimal = false;
- for (int i = 0; i < argTypes.size(); ++i) {
- switch (argTypes.get(i).getPrimitiveType()) {
- case BOOLEAN:
- beFn += "_BooleanVal";
- break;
- case TINYINT:
- beFn += "_TinyIntVal";
- break;
- case SMALLINT:
- beFn += "_SmallIntVal";
- break;
- case INT:
- beFn += "_IntVal";
- break;
- case BIGINT:
- beFn += "_BigIntVal";
- break;
- case FLOAT:
- beFn += "_FloatVal";
- break;
- case DOUBLE:
- beFn += "_DoubleVal";
- break;
- case STRING:
- case VARCHAR:
- beFn += "_StringVal";
- break;
- case CHAR:
- beFn += "_Char";
- break;
- case TIMESTAMP:
- beFn += "_TimestampVal";
- break;
- case DECIMAL:
- beFn += "_DecimalVal";
- usesDecimal = true;
- break;
- default:
- Preconditions.checkState(false,
- "Argument type not supported: " + argTypes.get(i).toSql());
- }
- }
- String beClass = usesDecimal ? "DecimalOperators" : "Operators";
- String symbol = "impala::" + beClass + "::" + beFn;
- return createBuiltinOperator(name, symbol, argTypes, retType);
- }
-
- public static ScalarFunction createBuiltinOperator(String name, String symbol,
- ArrayList<Type> argTypes, Type retType) {
- return createBuiltin(name, symbol, argTypes, false, retType, false);
- }
-
- public static ScalarFunction createBuiltin(String name, String symbol,
- ArrayList<Type> argTypes, boolean hasVarArgs, Type retType,
- boolean userVisible) {
- ScalarFunction fn = new ScalarFunction(
- new FunctionName(Catalog.BUILTINS_DB, name), argTypes, retType, hasVarArgs);
- fn.setBinaryType(TFunctionBinaryType.BUILTIN);
- fn.setUserVisible(userVisible);
- fn.setIsPersistent(true);
- try {
- fn.symbolName_ = fn.lookupSymbol(symbol, TSymbolType.UDF_EVALUATE, null,
- fn.hasVarArgs(), fn.getArgs());
- } catch (AnalysisException e) {
- // This should never happen
- Preconditions.checkState(false, "Builtin symbol '" + symbol + "'" + argTypes
- + " not found!" + e.getStackTrace());
- throw new RuntimeException("Builtin symbol not found!", e);
- }
- return fn;
- }
-
- /**
- * Create a function that is used to search the catalog for a matching builtin. Only
- * the fields necessary for matching function prototypes are specified.
- */
- public static ScalarFunction createBuiltinSearchDesc(String name, Type[] argTypes,
- boolean hasVarArgs) {
- ArrayList<Type> fnArgs =
- (argTypes == null) ? new ArrayList<Type>() : Lists.newArrayList(argTypes);
- ScalarFunction fn = new ScalarFunction(
- new FunctionName(Catalog.BUILTINS_DB, name), fnArgs, Type.INVALID, hasVarArgs);
- fn.setBinaryType(TFunctionBinaryType.BUILTIN);
- return fn;
- }
-
- /**
- * Static helper method to create a scalar function of given
- * TFunctionBinaryType.
- */
- public static ScalarFunction createForTesting(String db,
- String fnName, ArrayList<Type> args, Type retType, String uriPath,
- String symbolName, String initFnSymbol, String closeFnSymbol,
- TFunctionBinaryType type) {
- ScalarFunction fn = new ScalarFunction(new FunctionName(db, fnName), args,
- retType, new HdfsUri(uriPath), symbolName, initFnSymbol, closeFnSymbol);
- fn.setBinaryType(type);
- fn.setIsPersistent(true);
- return fn;
- }
-
- public void setSymbolName(String s) { symbolName_ = s; }
- public void setPrepareFnSymbol(String s) { prepareFnSymbol_ = s; }
- public void setCloseFnSymbol(String s) { closeFnSymbol_ = s; }
-
- public String getSymbolName() { return symbolName_; }
- public String getPrepareFnSymbol() { return prepareFnSymbol_; }
- public String getCloseFnSymbol() { return closeFnSymbol_; }
-
- @Override
- public String toSql(boolean ifNotExists) {
- StringBuilder sb = new StringBuilder("CREATE FUNCTION ");
- if (ifNotExists) sb.append("IF NOT EXISTS ");
- sb.append(dbName() + "." + signatureString() + "\n")
- .append(" RETURNS " + getReturnType() + "\n")
- .append(" LOCATION '" + getLocation() + "'\n")
- .append(" SYMBOL='" + getSymbolName() + "'\n");
- return sb.toString();
- }
-
- @Override
- public TFunction toThrift() {
- TFunction fn = super.toThrift();
- fn.setScalar_fn(new TScalarFunction());
- fn.getScalar_fn().setSymbol(symbolName_);
- if (prepareFnSymbol_ != null) fn.getScalar_fn().setPrepare_fn_symbol(prepareFnSymbol_);
- if (closeFnSymbol_ != null) fn.getScalar_fn().setClose_fn_symbol(closeFnSymbol_);
- return fn;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/ScalarType.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ScalarType.java b/fe/src/main/java/com/cloudera/impala/catalog/ScalarType.java
deleted file mode 100644
index ae955f8..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/ScalarType.java
+++ /dev/null
@@ -1,478 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import org.apache.commons.lang3.StringUtils;
-
-import com.cloudera.impala.analysis.TypesUtil;
-import com.cloudera.impala.thrift.TColumnType;
-import com.cloudera.impala.thrift.TScalarType;
-import com.cloudera.impala.thrift.TTypeNode;
-import com.cloudera.impala.thrift.TTypeNodeType;
-import com.google.common.base.Preconditions;
-
-/**
- * Describes a scalar type. For most types this class just wraps a PrimitiveType enum,
- * but for types like CHAR and DECIMAL, this class contain additional information.
- *
- * Scalar types have a few ways they can be compared to other scalar types. They can be:
- * 1. completely identical,
- * 2. implicitly castable (convertible without loss of precision)
- * 3. subtype. For example, in the case of decimal, a type can be decimal(*, *)
- * indicating that any decimal type is a subtype of the decimal type.
- */
-public class ScalarType extends Type {
- private final PrimitiveType type_;
-
- // Only used for type CHAR.
- private int len_;
-
- // Only used if type is DECIMAL. -1 (for both) is used to represent a
- // decimal with any precision and scale.
- // It is invalid to have one by -1 and not the other.
- // TODO: we could use that to store DECIMAL(8,*), indicating a decimal
- // with 8 digits of precision and any valid ([0-8]) scale.
- private int precision_;
- private int scale_;
-
- // SQL allows the engine to pick the default precision. We pick the largest
- // precision that is supported by the smallest decimal type in the BE (4 bytes).
- public static final int DEFAULT_PRECISION = 9;
- public static final int DEFAULT_SCALE = 0; // SQL standard
-
- // Longest supported VARCHAR and CHAR, chosen to match Hive.
- public static final int MAX_VARCHAR_LENGTH = 65355;
- public static final int MAX_CHAR_LENGTH = 255;
-
- // Longest CHAR that we in line in the tuple.
- // Keep consistent with backend ColumnType::CHAR_INLINE_LENGTH
- public static final int CHAR_INLINE_LENGTH = 128;
-
- // Hive, mysql, sql server standard.
- public static final int MAX_PRECISION = 38;
- public static final int MAX_SCALE = MAX_PRECISION;
-
- protected ScalarType(PrimitiveType type) {
- type_ = type;
- }
-
- public static ScalarType createType(PrimitiveType type) {
- switch (type) {
- case INVALID_TYPE: return INVALID;
- case NULL_TYPE: return NULL;
- case BOOLEAN: return BOOLEAN;
- case SMALLINT: return SMALLINT;
- case TINYINT: return TINYINT;
- case INT: return INT;
- case BIGINT: return BIGINT;
- case FLOAT: return FLOAT;
- case DOUBLE: return DOUBLE;
- case STRING: return STRING;
- case VARCHAR: return createVarcharType();
- case BINARY: return BINARY;
- case TIMESTAMP: return TIMESTAMP;
- case DATE: return DATE;
- case DATETIME: return DATETIME;
- case DECIMAL: return (ScalarType) createDecimalType();
- default:
- Preconditions.checkState(false);
- return NULL;
- }
- }
-
- public static ScalarType createCharType(int len) {
- ScalarType type = new ScalarType(PrimitiveType.CHAR);
- type.len_ = len;
- return type;
- }
-
- public static ScalarType createDecimalType() { return DEFAULT_DECIMAL; }
-
- public static ScalarType createDecimalType(int precision) {
- return createDecimalType(precision, DEFAULT_SCALE);
- }
-
- public static ScalarType createDecimalType(int precision, int scale) {
- Preconditions.checkState(precision >= 0); // Enforced by parser
- Preconditions.checkState(scale >= 0); // Enforced by parser.
- ScalarType type = new ScalarType(PrimitiveType.DECIMAL);
- type.precision_ = precision;
- type.scale_ = scale;
- return type;
- }
-
- // Identical to createDecimalType except that higher precisions are truncated
- // to the max storable precision. The BE will report overflow in these cases
- // (think of this as adding ints to BIGINT but BIGINT can still overflow).
- public static ScalarType createDecimalTypeInternal(int precision, int scale) {
- ScalarType type = new ScalarType(PrimitiveType.DECIMAL);
- type.precision_ = Math.min(precision, MAX_PRECISION);
- type.scale_ = Math.min(type.precision_, scale);
- return type;
- }
-
- public static ScalarType createVarcharType(int len) {
- // length checked in analysis
- ScalarType type = new ScalarType(PrimitiveType.VARCHAR);
- type.len_ = len;
- return type;
- }
-
- public static ScalarType createVarcharType() {
- return DEFAULT_VARCHAR;
- }
-
- @Override
- public String toString() {
- if (type_ == PrimitiveType.CHAR) {
- if (isWildcardChar()) return "CHAR(*)";
- return "CHAR(" + len_ + ")";
- } else if (type_ == PrimitiveType.DECIMAL) {
- if (isWildcardDecimal()) return "DECIMAL(*,*)";
- return "DECIMAL(" + precision_ + "," + scale_ + ")";
- } else if (type_ == PrimitiveType.VARCHAR) {
- if (isWildcardVarchar()) return "VARCHAR(*)";
- return "VARCHAR(" + len_ + ")";
- }
- return type_.toString();
- }
-
- @Override
- public String toSql(int depth) {
- if (depth >= MAX_NESTING_DEPTH) return "...";
- switch(type_) {
- case BINARY: return type_.toString();
- case VARCHAR:
- case CHAR:
- return type_.toString() + "(" + len_ + ")";
- case DECIMAL:
- return String.format("%s(%s,%s)", type_.toString(), precision_, scale_);
- default: return type_.toString();
- }
- }
-
- @Override
- protected String prettyPrint(int lpad) {
- return StringUtils.repeat(' ', lpad) + toSql();
- }
-
- @Override
- public void toThrift(TColumnType container) {
- TTypeNode node = new TTypeNode();
- container.types.add(node);
- switch(type_) {
- case VARCHAR:
- case CHAR: {
- node.setType(TTypeNodeType.SCALAR);
- TScalarType scalarType = new TScalarType();
- scalarType.setType(type_.toThrift());
- scalarType.setLen(len_);
- node.setScalar_type(scalarType);
- break;
- }
- case DECIMAL: {
- node.setType(TTypeNodeType.SCALAR);
- TScalarType scalarType = new TScalarType();
- scalarType.setType(type_.toThrift());
- scalarType.setScale(scale_);
- scalarType.setPrecision(precision_);
- node.setScalar_type(scalarType);
- break;
- }
- default: {
- node.setType(TTypeNodeType.SCALAR);
- TScalarType scalarType = new TScalarType();
- scalarType.setType(type_.toThrift());
- node.setScalar_type(scalarType);
- break;
- }
- }
- }
-
- public static Type[] toColumnType(PrimitiveType[] types) {
- Type result[] = new Type[types.length];
- for (int i = 0; i < types.length; ++i) {
- result[i] = createType(types[i]);
- }
- return result;
- }
-
- public int decimalPrecision() {
- Preconditions.checkState(type_ == PrimitiveType.DECIMAL);
- return precision_;
- }
-
- public int decimalScale() {
- Preconditions.checkState(type_ == PrimitiveType.DECIMAL);
- return scale_;
- }
-
- @Override
- public PrimitiveType getPrimitiveType() { return type_; }
- public int ordinal() { return type_.ordinal(); }
- public int getLength() { return len_; }
-
- @Override
- public boolean isWildcardDecimal() {
- return type_ == PrimitiveType.DECIMAL && precision_ == -1 && scale_ == -1;
- }
-
- @Override
- public boolean isWildcardVarchar() {
- return type_ == PrimitiveType.VARCHAR && len_ == -1;
- }
-
- @Override
- public boolean isWildcardChar() {
- return type_ == PrimitiveType.CHAR && len_ == -1;
- }
-
- /**
- * Returns true if this type is a fully specified (not wild card) decimal.
- */
- @Override
- public boolean isFullySpecifiedDecimal() {
- if (!isDecimal()) return false;
- if (isWildcardDecimal()) return false;
- if (precision_ <= 0 || precision_ > MAX_PRECISION) return false;
- if (scale_ < 0 || scale_ > precision_) return false;
- return true;
- }
-
- @Override
- public boolean isFixedLengthType() {
- return type_ == PrimitiveType.BOOLEAN || type_ == PrimitiveType.TINYINT
- || type_ == PrimitiveType.SMALLINT || type_ == PrimitiveType.INT
- || type_ == PrimitiveType.BIGINT || type_ == PrimitiveType.FLOAT
- || type_ == PrimitiveType.DOUBLE || type_ == PrimitiveType.DATE
- || type_ == PrimitiveType.DATETIME || type_ == PrimitiveType.TIMESTAMP
- || type_ == PrimitiveType.CHAR || type_ == PrimitiveType.DECIMAL;
- }
-
- @Override
- public boolean isSupported() {
- return !getUnsupportedTypes().contains(this);
- }
-
- @Override
- public boolean supportsTablePartitioning() {
- if (!isSupported() || isComplexType() || type_ == PrimitiveType.TIMESTAMP) {
- return false;
- }
- return true;
- }
-
- @Override
- public int getSlotSize() {
- switch (type_) {
- case CHAR:
- if (len_ > CHAR_INLINE_LENGTH || len_ == 0) return STRING.getSlotSize();
- return len_;
- case DECIMAL: return TypesUtil.getDecimalSlotSize(this);
- default:
- return type_.getSlotSize();
- }
- }
-
- /**
- * Returns true if this object is of type t.
- * Handles wildcard types. That is, if t is the wildcard type variant
- * of 'this', returns true.
- */
- @Override
- public boolean matchesType(Type t) {
- if (equals(t)) return true;
- if (!t.isScalarType()) return false;
- ScalarType scalarType = (ScalarType) t;
- if (type_ == PrimitiveType.VARCHAR && scalarType.isWildcardVarchar()) {
- Preconditions.checkState(!isWildcardVarchar());
- return true;
- }
- if (type_ == PrimitiveType.CHAR && scalarType.isWildcardChar()) {
- Preconditions.checkState(!isWildcardChar());
- return true;
- }
- if (isDecimal() && scalarType.isWildcardDecimal()) {
- Preconditions.checkState(!isWildcardDecimal());
- return true;
- }
- return false;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof ScalarType)) return false;
- ScalarType other = (ScalarType)o;
- if (type_ != other.type_) return false;
- if (type_ == PrimitiveType.CHAR) return len_ == other.len_;
- if (type_ == PrimitiveType.VARCHAR) return len_ == other.len_;
- if (type_ == PrimitiveType.DECIMAL) {
- return precision_ == other.precision_ && scale_ == other.scale_;
- }
- return true;
- }
-
- public Type getMaxResolutionType() {
- if (isIntegerType()) {
- return ScalarType.BIGINT;
- // Timestamps get summed as DOUBLE for AVG.
- } else if (isFloatingPointType() || type_ == PrimitiveType.TIMESTAMP) {
- return ScalarType.DOUBLE;
- } else if (isNull()) {
- return ScalarType.NULL;
- } else if (isDecimal()) {
- return createDecimalTypeInternal(MAX_PRECISION, scale_);
- } else {
- return ScalarType.INVALID;
- }
- }
-
- public ScalarType getNextResolutionType() {
- Preconditions.checkState(isNumericType() || isNull());
- if (type_ == PrimitiveType.DOUBLE || type_ == PrimitiveType.BIGINT || isNull()) {
- return this;
- } else if (type_ == PrimitiveType.DECIMAL) {
- return createDecimalTypeInternal(MAX_PRECISION, scale_);
- }
- return createType(PrimitiveType.values()[type_.ordinal() + 1]);
- }
-
- /**
- * Returns the smallest decimal type that can safely store this type. Returns
- * INVALID if this type cannot be stored as a decimal.
- */
- public ScalarType getMinResolutionDecimal() {
- switch (type_) {
- case NULL_TYPE: return Type.NULL;
- case DECIMAL: return this;
- case TINYINT: return createDecimalType(3);
- case SMALLINT: return createDecimalType(5);
- case INT: return createDecimalType(10);
- case BIGINT: return createDecimalType(19);
- case FLOAT: return createDecimalTypeInternal(MAX_PRECISION, 9);
- case DOUBLE: return createDecimalTypeInternal(MAX_PRECISION, 17);
- default: return ScalarType.INVALID;
- }
- }
-
- /**
- * Returns true if this decimal type is a supertype of the other decimal type.
- * e.g. (10,3) is a supertype of (3,3) but (5,4) is not a supertype of (3,0).
- * To be a super type of another decimal, the number of digits before and after
- * the decimal point must be greater or equal.
- */
- public boolean isSupertypeOf(ScalarType o) {
- Preconditions.checkState(isDecimal());
- Preconditions.checkState(o.isDecimal());
- if (isWildcardDecimal()) return true;
- if (o.isWildcardDecimal()) return false;
- return scale_ >= o.scale_ && precision_ - scale_ >= o.precision_ - o.scale_;
- }
-
- /**
- * Return type t such that values from both t1 and t2 can be assigned to t.
- * If strict, only return types when there will be no loss of precision.
- * Returns INVALID_TYPE if there is no such type or if any of t1 and t2
- * is INVALID_TYPE.
- */
- public static ScalarType getAssignmentCompatibleType(ScalarType t1,
- ScalarType t2, boolean strict) {
- if (!t1.isValid() || !t2.isValid()) return INVALID;
- if (t1.equals(t2)) return t1;
- if (t1.isNull()) return t2;
- if (t2.isNull()) return t1;
-
- if (t1.type_ == PrimitiveType.VARCHAR || t2.type_ == PrimitiveType.VARCHAR) {
- if (t1.type_ == PrimitiveType.STRING || t2.type_ == PrimitiveType.STRING) {
- return STRING;
- }
- if (t1.isStringType() && t2.isStringType()) {
- return createVarcharType(Math.max(t1.len_, t2.len_));
- }
- return INVALID;
- }
-
- if (t1.type_ == PrimitiveType.CHAR || t2.type_ == PrimitiveType.CHAR) {
- Preconditions.checkState(t1.type_ != PrimitiveType.VARCHAR);
- Preconditions.checkState(t2.type_ != PrimitiveType.VARCHAR);
- if (t1.type_ == PrimitiveType.STRING || t2.type_ == PrimitiveType.STRING) {
- return STRING;
- }
- if (t1.type_ == PrimitiveType.CHAR && t2.type_ == PrimitiveType.CHAR) {
- return createCharType(Math.max(t1.len_, t2.len_));
- }
- return INVALID;
- }
-
- if (t1.isDecimal() || t2.isDecimal()) {
- // The case of decimal and float/double must be handled carefully. There are two
- // modes: strict and non-strict. In non-strict mode, we convert to the floating
- // point type, since it can contain a larger range of values than any decimal (but
- // has lower precision in some parts of its range), so it is generally better.
- // In strict mode, we avoid conversion in either direction because there are also
- // decimal values (e.g. 0.1) that cannot be exactly represented in binary
- // floating point.
- // TODO: it might make sense to promote to double in many cases, but this would
- // require more work elsewhere to avoid breaking things, e.g. inserting decimal
- // literals into float columns.
- if (t1.isFloatingPointType()) return strict ? INVALID : t1;
- if (t2.isFloatingPointType()) return strict ? INVALID : t2;
-
- // Allow casts between decimal and numeric types by converting
- // numeric types to the containing decimal type.
- ScalarType t1Decimal = t1.getMinResolutionDecimal();
- ScalarType t2Decimal = t2.getMinResolutionDecimal();
- if (t1Decimal.isInvalid() || t2Decimal.isInvalid()) return Type.INVALID;
- Preconditions.checkState(t1Decimal.isDecimal());
- Preconditions.checkState(t2Decimal.isDecimal());
-
- if (t1Decimal.equals(t2Decimal)) {
- Preconditions.checkState(!(t1.isDecimal() && t2.isDecimal()));
- // The containing decimal type for a non-decimal type is always an exclusive
- // upper bound, therefore the decimal has higher precision.
- return t1Decimal;
- }
- if (t1Decimal.isSupertypeOf(t2Decimal)) return t1;
- if (t2Decimal.isSupertypeOf(t1Decimal)) return t2;
- return TypesUtil.getDecimalAssignmentCompatibleType(t1Decimal, t2Decimal);
- }
-
- PrimitiveType smallerType =
- (t1.type_.ordinal() < t2.type_.ordinal() ? t1.type_ : t2.type_);
- PrimitiveType largerType =
- (t1.type_.ordinal() > t2.type_.ordinal() ? t1.type_ : t2.type_);
- PrimitiveType result = null;
- if (strict) {
- result = strictCompatibilityMatrix[smallerType.ordinal()][largerType.ordinal()];
- }
- if (result == null) {
- result = compatibilityMatrix[smallerType.ordinal()][largerType.ordinal()];
- }
- Preconditions.checkNotNull(result);
- return createType(result);
- }
-
- /**
- * Returns true t1 can be implicitly cast to t2, false otherwise.
- * If strict is true, only consider casts that result in no loss of precision.
- */
- public static boolean isImplicitlyCastable(ScalarType t1, ScalarType t2,
- boolean strict) {
- return getAssignmentCompatibleType(t1, t2, strict).matchesType(t2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/StructField.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/StructField.java b/fe/src/main/java/com/cloudera/impala/catalog/StructField.java
deleted file mode 100644
index 4b9cb80..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/StructField.java
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import org.apache.commons.lang3.StringUtils;
-
-import com.cloudera.impala.thrift.TColumnType;
-import com.cloudera.impala.thrift.TStructField;
-import com.cloudera.impala.thrift.TTypeNode;
-
-/**
- * TODO: Support comments for struct fields. The Metastore does not properly store
- * comments of struct fields. We set comment_ to null to avoid compatibility issues.
- */
-public class StructField {
- protected final String name_;
- protected final Type type_;
- protected final String comment_;
- protected int position_; // in struct
-
- public StructField(String name, Type type, String comment) {
- name_ = name;
- type_ = type;
- comment_ = comment;
- }
-
- public StructField(String name, Type type) {
- this(name, type, null);
- }
-
- public String getComment() { return comment_; }
- public String getName() { return name_; }
- public Type getType() { return type_; }
- public int getPosition() { return position_; }
- public void setPosition(int position) { position_ = position; }
-
- public String toSql(int depth) {
- String typeSql = (depth < Type.MAX_NESTING_DEPTH) ? type_.toSql(depth) : "...";
- StringBuilder sb = new StringBuilder(name_);
- if (type_ != null) sb.append(":" + typeSql);
- if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_));
- return sb.toString();
- }
-
- /**
- * Pretty prints this field with lpad number of leading spaces.
- * Calls prettyPrint(lpad) on this field's type.
- */
- public String prettyPrint(int lpad) {
- String leftPadding = StringUtils.repeat(' ', lpad);
- StringBuilder sb = new StringBuilder(leftPadding + name_);
- if (type_ != null) {
- // Pass in the padding to make sure nested fields are aligned properly,
- // even if we then strip the top-level padding.
- String typeStr = type_.prettyPrint(lpad);
- typeStr = typeStr.substring(lpad);
- sb.append(":" + typeStr);
- }
- if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_));
- return sb.toString();
- }
-
- public void toThrift(TColumnType container, TTypeNode node) {
- TStructField field = new TStructField();
- field.setName(name_);
- if (comment_ != null) field.setComment(comment_);
- node.struct_fields.add(field);
- type_.toThrift(container);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof StructField)) return false;
- StructField otherStructField = (StructField) other;
- return otherStructField.name_.equals(name_) && otherStructField.type_.equals(type_);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/StructType.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/StructType.java b/fe/src/main/java/com/cloudera/impala/catalog/StructType.java
deleted file mode 100644
index 3600b6b..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/StructType.java
+++ /dev/null
@@ -1,107 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.commons.lang3.StringUtils;
-
-import com.cloudera.impala.thrift.TColumnType;
-import com.cloudera.impala.thrift.TStructField;
-import com.cloudera.impala.thrift.TTypeNode;
-import com.cloudera.impala.thrift.TTypeNodeType;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Describes a STRUCT type. STRUCT types have a list of named struct fields.
- */
-public class StructType extends Type {
- private final HashMap<String, StructField> fieldMap_ = Maps.newHashMap();
- private final ArrayList<StructField> fields_;
-
- public StructType(ArrayList<StructField> fields) {
- Preconditions.checkNotNull(fields);
- fields_ = fields;
- for (int i = 0; i < fields_.size(); ++i) {
- fields_.get(i).setPosition(i);
- fieldMap_.put(fields_.get(i).getName().toLowerCase(), fields_.get(i));
- }
- }
-
- public StructType() {
- fields_ = Lists.newArrayList();
- }
-
- @Override
- public String toSql(int depth) {
- if (depth >= MAX_NESTING_DEPTH) return "STRUCT<...>";
- ArrayList<String> fieldsSql = Lists.newArrayList();
- for (StructField f: fields_) fieldsSql.add(f.toSql(depth + 1));
- return String.format("STRUCT<%s>", Joiner.on(",").join(fieldsSql));
- }
-
- @Override
- protected String prettyPrint(int lpad) {
- String leftPadding = StringUtils.repeat(' ', lpad);
- ArrayList<String> fieldsSql = Lists.newArrayList();
- for (StructField f: fields_) fieldsSql.add(f.prettyPrint(lpad + 2));
- return String.format("%sSTRUCT<\n%s\n%s>",
- leftPadding, Joiner.on(",\n").join(fieldsSql), leftPadding);
- }
-
- public void addField(StructField field) {
- field.setPosition(fields_.size());
- fields_.add(field);
- fieldMap_.put(field.getName().toLowerCase(), field);
- }
-
- public ArrayList<StructField> getFields() { return fields_; }
-
- public StructField getField(String fieldName) {
- return fieldMap_.get(fieldName.toLowerCase());
- }
-
- public void clearFields() {
- fields_.clear();
- fieldMap_.clear();
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof StructType)) return false;
- StructType otherStructType = (StructType) other;
- return otherStructType.getFields().equals(fields_);
- }
-
- @Override
- public void toThrift(TColumnType container) {
- TTypeNode node = new TTypeNode();
- container.types.add(node);
- Preconditions.checkNotNull(fields_);
- Preconditions.checkNotNull(!fields_.isEmpty());
- node.setType(TTypeNodeType.STRUCT);
- node.setStruct_fields(new ArrayList<TStructField>());
- for (StructField field: fields_) {
- field.toThrift(container, node);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Table.java b/fe/src/main/java/com/cloudera/impala/catalog/Table.java
deleted file mode 100644
index f794d7e..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/Table.java
+++ /dev/null
@@ -1,484 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.log4j.Logger;
-
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.common.Pair;
-import com.cloudera.impala.thrift.TAccessLevel;
-import com.cloudera.impala.thrift.TCatalogObject;
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TColumn;
-import com.cloudera.impala.thrift.TColumnDescriptor;
-import com.cloudera.impala.thrift.TTable;
-import com.cloudera.impala.thrift.TTableDescriptor;
-import com.cloudera.impala.thrift.TTableStats;
-import com.cloudera.impala.util.HdfsCachingUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Base class for table metadata.
- *
- * This includes the concept of clustering columns, which are columns by which the table
- * data is physically clustered. In other words, if two rows share the same values
- * for the clustering columns, those two rows are most likely colocated. Note that this
- * is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions
- * a key range into a fixed number of buckets).
- */
-public abstract class Table implements CatalogObject {
- private static final Logger LOG = Logger.getLogger(Table.class);
-
- // Lock used to serialize calls to the Hive MetaStore to work around MetaStore
- // concurrency bugs. Currently used to serialize calls to "getTable()" due to HIVE-5457.
- private static final Object metastoreAccessLock_ = new Object();
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
- protected org.apache.hadoop.hive.metastore.api.Table msTable_;
-
- protected final TableId id_;
- protected final Db db_;
- protected final String name_;
- protected final String owner_;
- protected TTableDescriptor tableDesc_;
- protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
-
- // Number of clustering columns.
- protected int numClusteringCols_;
-
- // estimated number of rows in table; -1: unknown.
- protected long numRows_ = -1;
-
- // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are
- // the clustering columns.
- protected final ArrayList<Column> colsByPos_ = Lists.newArrayList();
-
- // map from lowercase column name to Column object.
- private final Map<String, Column> colsByName_ = Maps.newHashMap();
-
- // Type of this table (array of struct) that mirrors the columns. Useful for analysis.
- protected final ArrayType type_ = new ArrayType(new StructType());
-
- // The lastDdlTime for this table; -1 if not set
- protected long lastDdlTime_;
-
- // Set of supported table types.
- protected static EnumSet<TableType> SUPPORTED_TABLE_TYPES = EnumSet.of(
- TableType.EXTERNAL_TABLE, TableType.MANAGED_TABLE, TableType.VIRTUAL_VIEW);
-
- protected Table(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
- String name, String owner) {
- id_ = id;
- msTable_ = msTable;
- db_ = db;
- name_ = name.toLowerCase();
- owner_ = owner;
- lastDdlTime_ = (msTable_ != null) ?
- CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
- }
-
- public abstract TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions);
- public abstract TCatalogObjectType getCatalogObjectType();
-
- /**
- * Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse
- * valid existing metadata.
- */
- public abstract void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException;
-
- public void addColumn(Column col) {
- colsByPos_.add(col);
- colsByName_.put(col.getName().toLowerCase(), col);
- ((StructType) type_.getItemType()).addField(
- new StructField(col.getName(), col.getType(), col.getComment()));
- }
-
- public void clearColumns() {
- colsByPos_.clear();
- colsByName_.clear();
- ((StructType) type_.getItemType()).clearFields();
- }
-
- /**
- * Updates the lastDdlTime for this Table, if the new value is greater
- * than the existing value. Does nothing if the new value is less than
- * or equal to the existing value.
- */
- public void updateLastDdlTime(long ddlTime) {
- // Ensure the lastDdlTime never goes backwards.
- if (ddlTime > lastDdlTime_) lastDdlTime_ = ddlTime;
- }
-
- // Returns a list of all column names for this table which we expect to have column
- // stats in the HMS. This exists because, when we request the column stats from HMS,
- // including a column name that does not have stats causes the
- // getTableColumnStatistics() to return nothing. For Hdfs tables, partition columns do
- // not have column stats in the HMS, but HBase table clustering columns do have column
- // stats. This method allows each table type to volunteer the set of columns we should
- // ask the metastore for in loadAllColumnStats().
- protected List<String> getColumnNamesWithHmsStats() {
- List<String> ret = Lists.newArrayList();
- for (String name: colsByName_.keySet()) ret.add(name);
- return ret;
- }
-
- /**
- * Loads column statistics for all columns in this table from the Hive metastore. Any
- * errors are logged and ignored, since the absence of column stats is not critical to
- * the correctness of the system.
- */
- protected void loadAllColumnStats(IMetaStoreClient client) {
- LOG.debug("Loading column stats for table: " + name_);
- List<ColumnStatisticsObj> colStats;
-
- // We need to only query those columns which may have stats; asking HMS for other
- // columns causes loadAllColumnStats() to return nothing.
- List<String> colNames = getColumnNamesWithHmsStats();
-
- try {
- colStats = client.getTableColumnStatistics(db_.getName(), name_, colNames);
- } catch (Exception e) {
- LOG.warn("Could not load column statistics for: " + getFullName(), e);
- return;
- }
-
- for (ColumnStatisticsObj stats: colStats) {
- Column col = getColumn(stats.getColName());
- Preconditions.checkNotNull(col);
- if (!ColumnStats.isSupportedColType(col.getType())) {
- LOG.warn(String.format("Statistics for %s, column %s are not supported as " +
- "column has type %s", getFullName(), col.getName(), col.getType()));
- continue;
- }
-
- if (!col.updateStats(stats.getStatsData())) {
- LOG.warn(String.format("Failed to load column stats for %s, column %s. Stats " +
- "may be incompatible with column type %s. Consider regenerating statistics " +
- "for %s.", getFullName(), col.getName(), col.getType(), getFullName()));
- continue;
- }
- }
- }
-
- /**
- * Returns the value of the ROW_COUNT constant, or -1 if not found.
- */
- protected static long getRowCount(Map<String, String> parameters) {
- if (parameters == null) return -1;
- String numRowsStr = parameters.get(StatsSetupConst.ROW_COUNT);
- if (numRowsStr == null) return -1;
- try {
- return Long.valueOf(numRowsStr);
- } catch (NumberFormatException exc) {
- // ignore
- }
- return -1;
- }
-
- /**
- * Creates a table of the appropriate type based on the given hive.metastore.api.Table
- * object.
- */
- public static Table fromMetastoreTable(TableId id, Db db,
- org.apache.hadoop.hive.metastore.api.Table msTbl) {
- // Create a table of appropriate type
- Table table = null;
- if (TableType.valueOf(msTbl.getTableType()) == TableType.VIRTUAL_VIEW) {
- table = new View(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner());
- } else if (HBaseTable.isHBaseTable(msTbl)) {
- table = new HBaseTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner());
- } else if (KuduTable.isKuduTable(msTbl)) {
- table = new KuduTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner());
- } else if (DataSourceTable.isDataSourceTable(msTbl)) {
- // It's important to check if this is a DataSourceTable before HdfsTable because
- // DataSourceTables are still represented by HDFS tables in the metastore but
- // have a special table property to indicate that Impala should use an external
- // data source.
- table = new DataSourceTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner());
- } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
- table = new HdfsTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner());
- }
- return table;
- }
-
- /**
- * Factory method that creates a new Table from its Thrift representation.
- * Determines the type of table to create based on the Thrift table provided.
- */
- public static Table fromThrift(Db parentDb, TTable thriftTable)
- throws TableLoadingException {
- Table newTable;
- if (!thriftTable.isSetLoad_status() && thriftTable.isSetMetastore_table()) {
- newTable = Table.fromMetastoreTable(new TableId(thriftTable.getId()),
- parentDb, thriftTable.getMetastore_table());
- } else {
- newTable = IncompleteTable.createUninitializedTable(
- TableId.createInvalidId(), parentDb, thriftTable.getTbl_name());
- }
- newTable.loadFromThrift(thriftTable);
- newTable.validate();
- return newTable;
- }
-
- protected void loadFromThrift(TTable thriftTable) throws TableLoadingException {
- List<TColumn> columns = new ArrayList<TColumn>();
- columns.addAll(thriftTable.getClustering_columns());
- columns.addAll(thriftTable.getColumns());
-
- colsByPos_.clear();
- colsByPos_.ensureCapacity(columns.size());
- for (int i = 0; i < columns.size(); ++i) {
- Column col = Column.fromThrift(columns.get(i));
- colsByPos_.add(col.getPosition(), col);
- colsByName_.put(col.getName().toLowerCase(), col);
- ((StructType) type_.getItemType()).addField(
- new StructField(col.getName(), col.getType(), col.getComment()));
- }
-
- numClusteringCols_ = thriftTable.getClustering_columns().size();
-
- // Estimated number of rows
- numRows_ = thriftTable.isSetTable_stats() ?
- thriftTable.getTable_stats().getNum_rows() : -1;
-
- // Default to READ_WRITE access if the field is not set.
- accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() :
- TAccessLevel.READ_WRITE;
- }
-
- /**
- * Checks preconditions for this table to function as expected. Currently only checks
- * that all entries in colsByName_ use lower case keys.
- */
- public void validate() throws TableLoadingException {
- for (String colName: colsByName_.keySet()) {
- if (!colName.equals(colName.toLowerCase())) {
- throw new TableLoadingException(
- "Expected lower case column name but found: " + colName);
- }
- }
- }
-
- public TTable toThrift() {
- TTable table = new TTable(db_.getName(), name_);
- table.setId(id_.asInt());
- table.setAccess_level(accessLevel_);
-
- // Populate both regular columns and clustering columns (if there are any).
- table.setColumns(new ArrayList<TColumn>());
- table.setClustering_columns(new ArrayList<TColumn>());
- for (int i = 0; i < colsByPos_.size(); ++i) {
- TColumn colDesc = colsByPos_.get(i).toThrift();
- // Clustering columns come first.
- if (i < numClusteringCols_) {
- table.addToClustering_columns(colDesc);
- } else {
- table.addToColumns(colDesc);
- }
- }
-
- table.setMetastore_table(getMetaStoreTable());
- if (numRows_ != -1) {
- table.setTable_stats(new TTableStats());
- table.getTable_stats().setNum_rows(numRows_);
- }
- return table;
- }
-
- public TCatalogObject toTCatalogObject() {
- TCatalogObject catalogObject = new TCatalogObject();
- catalogObject.setType(getCatalogObjectType());
- catalogObject.setCatalog_version(getCatalogVersion());
- catalogObject.setTable(toThrift());
- return catalogObject;
- }
-
- /**
- * Gets the ColumnType from the given FieldSchema by using Impala's SqlParser.
- * Throws a TableLoadingException if the FieldSchema could not be parsed.
- * The type can either be:
- * - Supported by Impala, in which case the type is returned.
- * - A type Impala understands but is not yet implemented (e.g. date), the type is
- * returned but type.IsSupported() returns false.
- * - A supported type that exceeds an Impala limit, e.g., on the nesting depth.
- * - A type Impala can't understand at all, and a TableLoadingException is thrown.
- */
- protected Type parseColumnType(FieldSchema fs) throws TableLoadingException {
- Type type = Type.parseColumnType(fs.getType());
- if (type == null) {
- throw new TableLoadingException(String.format(
- "Unsupported type '%s' in column '%s' of table '%s'",
- fs.getType(), fs.getName(), getName()));
- }
- if (type.exceedsMaxNestingDepth()) {
- throw new TableLoadingException(String.format(
- "Type exceeds the maximum nesting depth of %s:\n%s",
- Type.MAX_NESTING_DEPTH, type.toSql()));
- }
- return type;
- }
-
- public Db getDb() { return db_; }
- public String getName() { return name_; }
- public String getFullName() { return (db_ != null ? db_.getName() + "." : "") + name_; }
- public TableName getTableName() {
- return new TableName(db_ != null ? db_.getName() : null, name_);
- }
-
- public String getOwner() { return owner_; }
- public ArrayList<Column> getColumns() { return colsByPos_; }
-
- /**
- * Returns a list of the column names ordered by position.
- */
- public List<String> getColumnNames() {
- List<String> colNames = Lists.<String>newArrayList();
- for (Column col: colsByPos_) {
- colNames.add(col.getName());
- }
- return colNames;
- }
-
- /**
- * Returns a list of thrift column descriptors ordered by position.
- */
- public List<TColumnDescriptor> getTColumnDescriptors() {
- List<TColumnDescriptor> colDescs = Lists.<TColumnDescriptor>newArrayList();
- for (Column col: colsByPos_) {
- colDescs.add(new TColumnDescriptor(col.getName(), col.getType().toThrift()));
- }
- return colDescs;
- }
-
- /**
- * Subclasses should override this if they provide a storage handler class. Currently
- * only HBase tables need to provide a storage handler.
- */
- public String getStorageHandlerClassName() { return null; }
-
- /**
- * Returns the list of all columns, but with partition columns at the end of
- * the list rather than the beginning. This is equivalent to the order in
- * which Hive enumerates columns.
- */
- public ArrayList<Column> getColumnsInHiveOrder() {
- ArrayList<Column> columns = Lists.newArrayList(getNonClusteringColumns());
- columns.addAll(getClusteringColumns());
- return columns;
- }
-
- /**
- * Returns a struct type with the columns in the same order as getColumnsInHiveOrder().
- */
- public StructType getHiveColumnsAsStruct() {
- ArrayList<StructField> fields = Lists.newArrayListWithCapacity(colsByPos_.size());
- for (Column col: getColumnsInHiveOrder()) {
- fields.add(new StructField(col.getName(), col.getType(), col.getComment()));
- }
- return new StructType(fields);
- }
-
- /**
- * Returns the list of all partition columns.
- */
- public List<Column> getClusteringColumns() {
- return colsByPos_.subList(0, numClusteringCols_);
- }
-
- /**
- * Returns the list of all columns excluding any partition columns.
- */
- public List<Column> getNonClusteringColumns() {
- return colsByPos_.subList(numClusteringCols_, colsByPos_.size());
- }
-
- /**
- * Case-insensitive lookup.
- */
- public Column getColumn(String name) { return colsByName_.get(name.toLowerCase()); }
-
- /**
- * Returns the metastore.api.Table object this Table was created from. Returns null
- * if the derived Table object was not created from a metastore Table (ex. InlineViews).
- */
- public org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable() {
- return msTable_;
- }
-
- public void setMetaStoreTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
- msTable_ = msTbl;
- }
-
- public int getNumClusteringCols() { return numClusteringCols_; }
- public TableId getId() { return id_; }
- public long getNumRows() { return numRows_; }
- public ArrayType getType() { return type_; }
-
- @Override
- public long getCatalogVersion() { return catalogVersion_; }
-
- @Override
- public void setCatalogVersion(long catalogVersion) {
- catalogVersion_ = catalogVersion;
- }
-
- @Override
- public boolean isLoaded() { return true; }
-
- /**
- * If the table is cached, it returns a <cache pool name, replication factor> pair
- * and adds the table cached directive ID to 'cacheDirIds'. Otherwise, it
- * returns a <null, 0> pair.
- */
- public Pair<String, Short> getTableCacheInfo(List<Long> cacheDirIds) {
- String cachePoolName = null;
- Short cacheReplication = 0;
- Long cacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTable_.getParameters());
- if (cacheDirId != null) {
- try {
- cachePoolName = HdfsCachingUtil.getCachePool(cacheDirId);
- cacheReplication = HdfsCachingUtil.getCacheReplication(cacheDirId);
- Preconditions.checkNotNull(cacheReplication);
- if (numClusteringCols_ == 0) cacheDirIds.add(cacheDirId);
- } catch (ImpalaRuntimeException e) {
- // Catch the error so that the actual update to the catalog can progress,
- // this resets caching for the table though
- LOG.error(
- String.format("Cache directive %d was not found, uncache the table %s " +
- "to remove this message.", cacheDirId, getFullName()));
- cacheDirId = null;
- }
- }
- return new Pair<String, Short>(cachePoolName, cacheReplication);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/TableId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableId.java b/fe/src/main/java/com/cloudera/impala/catalog/TableId.java
deleted file mode 100644
index 1918029..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/TableId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class TableId extends Id<TableId> {
- // Construction only allowed via an IdGenerator.
- protected TableId(int id) {
- super(id);
- }
-
- public static IdGenerator<TableId> createGenerator() {
- return new IdGenerator<TableId>() {
- @Override
- public TableId getNextId() { return new TableId(nextId_++); }
- @Override
- public TableId getMaxId() { return new TableId(nextId_ - 1); }
- };
- }
-
- /**
- * Returns an invalid table id intended for temporary use, e.g., for CTAS.
- */
- public static TableId createInvalidId() { return new TableId(INVALID_ID); }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
deleted file mode 100644
index 78b58f6..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java
+++ /dev/null
@@ -1,101 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.log4j.Logger;
-
-import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-
-/**
- * Class that implements the logic for how a table's metadata should be loaded from
- * the Hive Metastore / HDFS / etc.
- */
-public class TableLoader {
- private static final Logger LOG = Logger.getLogger(TableLoader.class);
-
- // Set of supported table types.
- private static EnumSet<TableType> SUPPORTED_TABLE_TYPES = EnumSet.of(
- TableType.EXTERNAL_TABLE, TableType.MANAGED_TABLE, TableType.VIRTUAL_VIEW);
-
- private final CatalogServiceCatalog catalog_;
-
- // Lock used to serialize calls to the Hive MetaStore to work around MetaStore
- // concurrency bugs. Currently used to serialize calls to "getTable()" due to
- // HIVE-5457.
- private static final Object metastoreAccessLock_ = new Object();
-
- public TableLoader(CatalogServiceCatalog catalog) {
- catalog_ = catalog;
- }
-
- /**
- * Creates the Impala representation of Hive/HBase metadata for one table.
- * Calls load() on the appropriate instance of Table subclass.
- * Returns new instance of Table, If there were any errors loading the table metadata
- * an IncompleteTable will be returned that contains details on the error.
- */
- public Table load(Db db, String tblName) {
- String fullTblName = db.getName() + "." + tblName;
- LOG.info("Loading metadata for: " + fullTblName);
- Table table;
- // turn all exceptions into TableLoadingException
- try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
- org.apache.hadoop.hive.metastore.api.Table msTbl = null;
- // All calls to getTable() need to be serialized due to HIVE-5457.
- synchronized (metastoreAccessLock_) {
- msTbl = msClient.getHiveClient().getTable(db.getName(), tblName);
- }
- // Check that the Hive TableType is supported
- TableType tableType = TableType.valueOf(msTbl.getTableType());
- if (!SUPPORTED_TABLE_TYPES.contains(tableType)) {
- throw new TableLoadingException(String.format(
- "Unsupported table type '%s' for: %s", tableType, fullTblName));
- }
-
- // Create a table of appropriate type and have it load itself
- table = Table.fromMetastoreTable(catalog_.getNextTableId(), db, msTbl);
- if (table == null) {
- throw new TableLoadingException(
- "Unrecognized table type for table: " + fullTblName);
- }
- table.load(false, msClient.getHiveClient(), msTbl);
- table.validate();
- } catch (TableLoadingException e) {
- table = IncompleteTable.createFailedMetadataLoadTable(
- TableId.createInvalidId(), db, tblName, e);
- } catch (NoSuchObjectException e) {
- TableLoadingException tableDoesNotExist = new TableLoadingException(
- "Table " + fullTblName + " no longer exists in the Hive MetaStore. " +
- "Run 'invalidate metadata " + fullTblName + "' to update the Impala " +
- "catalog.");
- table = IncompleteTable.createFailedMetadataLoadTable(
- TableId.createInvalidId(), db, tblName, tableDoesNotExist);
- } catch (Exception e) {
- table = IncompleteTable.createFailedMetadataLoadTable(
- catalog_.getNextTableId(), db, tblName, new TableLoadingException(
- "Failed to load metadata for table: " + fullTblName + ". Running " +
- "'invalidate metadata " + fullTblName + "' may resolve this problem.", e));
- }
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingException.java b/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingException.java
deleted file mode 100644
index ab55675..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-
-/**
- * Thrown when a table metadata cannot be loaded due to an error.
- */
-public class TableLoadingException extends CatalogException {
- // Dummy serial UID to avoid Eclipse warnings
- private static final long serialVersionUID = 7760580025530083536L;
-
- public TableLoadingException(String s, Throwable cause) {
- super(s, cause);
- }
-
- public TableLoadingException(String s) {
- super(s);
- }
-};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingMgr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingMgr.java
deleted file mode 100644
index 57cc513..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/TableLoadingMgr.java
+++ /dev/null
@@ -1,333 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-import com.cloudera.impala.thrift.TTableName;
-import com.cloudera.impala.util.HdfsCachingUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-/**
-* Class that manages scheduling the loading of table metadata from the Hive Metastore and
-* the Hadoop NameNode. Loads tables using a pool of table loading threads. New load
-* requests can be submitted using loadAsync(), which will schedule the load when the
-* next thread becomes available. Also manages prioritized background table loading by
-* reading from a deque of table names to determine which table to load next. Tables added
-* to the head of the deque will be loaded before tables added to the tail, so the loading
-* order can be prioritized (see prioritizeLoad()/backgroundLoad()).
-*/
-public class TableLoadingMgr {
- /**
- * Represents the result of an asynchronous Table loading request. Calling
- * get() will block until the Table has completed loading. When finished
- * processing the request, call close() to clean up.
- */
- public class LoadRequest {
- private final Future<Table> tblTask_;
- private final TTableName tblName_;
-
- private LoadRequest(TTableName tblName, Future<Table> tblTask) {
- tblTask_ = tblTask;
- tblName_ = tblName;
- }
-
- /**
- * Blocks until the table has finished loading and returns the result. If any errors
- * were encountered while loading the table an IncompleteTable will be returned.
- */
- public Table get() {
- Table tbl;
- try {
- tbl = tblTask_.get();
- } catch (Exception e) {
- tbl = IncompleteTable.createFailedMetadataLoadTable(
- TableId.createInvalidId(), catalog_.getDb(tblName_.getDb_name()),
- tblName_.getTable_name(), new TableLoadingException(e.getMessage(), e));
- }
- Preconditions.checkState(tbl.isLoaded());
- return tbl;
- }
-
- /**
- * Cleans up the in-flight load request matching the given table name. Will not
- * cancel the load if it is still in progress, frees a slot should another
- * load for the same table come in. Can be called multiple times.
- */
- public void close() {
- synchronized (loadingTables_) {
- if (loadingTables_.get(tblName_) == tblTask_) loadingTables_.remove(tblName_);
- }
- }
- }
-
- private static final Logger LOG = Logger.getLogger(TableLoadingMgr.class);
-
- // A thread safe blocking deque that is used to prioritize the loading of table
- // metadata. The CatalogServer has a background thread that will always add unloaded
- // tables to the tail of the deque. However, a call to prioritizeLoad() will add
- // tables to the head of the deque. The next table to load is always taken from the
- // head of the deque. May contain the same table multiple times, but a second
- // attempt to load the table metadata will be a no-op.
- private final LinkedBlockingDeque<TTableName> tableLoadingDeque_ =
- new LinkedBlockingDeque<TTableName>();
-
- // A thread safe HashSet of table names that are in the tableLoadingDeque_. Used to
- // efficiently check for existence of items in the deque.
- // Updates may lead/lag updates to the tableLoadingDeque_ - they are added to this set
- // immediately before being added to the deque and removed immediately after removing
- // from the deque. The fact the updates are not synchronized shouldn't impact
- // functionality since this set is only used for efficient lookups.
- private final Set<TTableName> tableLoadingSet_ =
- Collections.synchronizedSet(new HashSet<TTableName>());
-
- // Map of table name to a FutureTask associated with the table load. Used to
- // prevent duplicate loads of the same table.
- private final ConcurrentHashMap<TTableName, FutureTask<Table>> loadingTables_ =
- new ConcurrentHashMap<TTableName, FutureTask<Table>>();
-
- // Map of table name to the cache directives that are being waited on for that table.
- // Once all directives have completed, the table's metadata will be refreshed and
- // the table will be removed from this map.
- // A caching operation may take a long time to complete, so to maximize query
- // throughput it is preferable to allow the user to continue to run queries against
- // the table while a cache request completes in the background.
- private final Map<TTableName, List<Long>> pendingTableCacheDirs_ = Maps.newHashMap();
-
- // The number of parallel threads to use to load table metadata. Should be set to a
- // value that provides good throughput while not putting too much stress on the
- // metastore.
- private final int numLoadingThreads_;
-
- // Pool of numLoadingThreads_ threads that loads table metadata. If additional tasks
- // are submitted to the pool after it is full, they will be queued and executed when
- // the next thread becomes available. There is no hard upper limit on the number of
- // pending tasks (no work will be rejected, but memory consumption is unbounded).
- private final ExecutorService tblLoadingPool_;
-
- // Thread that incrementally refreshes tables in the background. Used to update a
- // table's metadata after a long running operation completes, such as marking a
- // table as cached. There is no hard upper limit on the number of pending tasks
- // (no work will be rejected, but memory consumption is unbounded). If this thread
- // dies it will be automatically restarted.
- // The tables to process are read from the resfreshThreadWork_ queue.
- ExecutorService asyncRefreshThread_ = Executors.newSingleThreadExecutor();
-
- // Tables for the async refresh thread to process. Synchronization must be handled
- // externally.
- private final LinkedBlockingQueue<TTableName> refreshThreadWork_ =
- new LinkedBlockingQueue<TTableName>();
-
- private final CatalogServiceCatalog catalog_;
- private final TableLoader tblLoader_;
-
- public TableLoadingMgr(CatalogServiceCatalog catalog, int numLoadingThreads) {
- catalog_ = catalog;
- tblLoader_ = new TableLoader(catalog_);
- numLoadingThreads_ = numLoadingThreads;
- tblLoadingPool_ = Executors.newFixedThreadPool(numLoadingThreads_);
-
- // Start the background table loading threads.
- startTableLoadingThreads();
-
- // Start the asyncRefreshThread_. Currently used to wait for cache directives to
- // complete in the background.
- asyncRefreshThread_.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- while(true) {
- execAsyncRefreshWork(refreshThreadWork_.take());
- }
- }});
- }
-
- /**
- * Prioritizes the loading of the given table.
- */
- public void prioritizeLoad(TTableName tblName) {
- tableLoadingSet_.add(tblName);
- tableLoadingDeque_.offerFirst(tblName);
- }
-
- /**
- * Submits a single table for background (low priority) loading.
- */
- public void backgroundLoad(TTableName tblName) {
- // Only queue for background loading if the table doesn't already exist
- // in the table loading set.
- if (tableLoadingSet_.add(tblName)) {
- tableLoadingDeque_.offerLast(tblName);
- }
- }
-
- /**
- * Adds a list of cache directive IDs to watch for the given table name.
- * The asyncRefreshThread_ will process the cache directives and once all directives
- * complete (data has been cached or no progress is being made), the
- * asyncRefreshThread_ will refresh the table metadata. After processing the
- * request the watch will be deleted.
- */
- public void watchCacheDirs(List<Long> cacheDirIds, final TTableName tblName) {
- synchronized (pendingTableCacheDirs_) {
- // A single table may have multiple pending cache requests since one request
- // gets submitted per-partition.
- List<Long> existingCacheReqIds = pendingTableCacheDirs_.get(tblName);
- if (existingCacheReqIds == null) {
- existingCacheReqIds = cacheDirIds;
- pendingTableCacheDirs_.put(tblName, cacheDirIds);
- refreshThreadWork_.add(tblName);
- } else {
- existingCacheReqIds.addAll(cacheDirIds);
- }
- }
- }
-
- /**
- * Loads a table asynchronously, returning a LoadRequest that can be used to get
- * the result (a Table). If there is already a load in flight for this table name,
- * the same underlying loading task (Future) will be used, helping to prevent duplicate
- * loads of the same table.
- */
- public LoadRequest loadAsync(final TTableName tblName)
- throws DatabaseNotFoundException {
- final Db parentDb = catalog_.getDb(tblName.getDb_name());
- if (parentDb == null) {
- throw new DatabaseNotFoundException(
- "Database '" + tblName.getDb_name() + "' was not found.");
- }
-
- FutureTask<Table> tableLoadTask = new FutureTask<Table>(new Callable<Table>() {
- @Override
- public Table call() throws Exception {
- return tblLoader_.load(parentDb, tblName.table_name);
- }});
-
- FutureTask<Table> existingValue = loadingTables_.putIfAbsent(tblName, tableLoadTask);
- if (existingValue == null) {
- // There was no existing value, submit a new load request.
- tblLoadingPool_.execute(tableLoadTask);
- } else {
- tableLoadTask = existingValue;
- }
- return new LoadRequest(tblName, tableLoadTask);
- }
-
- /**
- * Starts table loading threads in a fixed sized thread pool with a size
- * defined by NUM_TBL_LOADING_THREADS. Each thread polls the tableLoadingDeque_
- * for new tables to load.
- */
- private void startTableLoadingThreads() {
- ExecutorService loadingPool = Executors.newFixedThreadPool(numLoadingThreads_);
- try {
- for (int i = 0; i < numLoadingThreads_; ++i) {
- loadingPool.execute(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- loadNextTable();
- } catch (Exception e) {
- LOG.error("Error loading table: ", e);
- // Ignore exception.
- }
- }
- }
- });
- }
- } finally {
- loadingPool.shutdown();
- }
- }
-
- /**
- * Gets the next table name to load off the head of the table loading queue. If
- * the queue is empty, this will block until a new table is added.
- */
- private void loadNextTable() throws InterruptedException {
- // Always get the next table from the head of the deque.
- final TTableName tblName = tableLoadingDeque_.takeFirst();
- tableLoadingSet_.remove(tblName);
- LOG.debug("Loading next table. Remaining items in queue: "
- + tableLoadingDeque_.size());
- try {
- // TODO: Instead of calling "getOrLoad" here we could call "loadAsync". We would
- // just need to add a mechanism for moving loaded tables into the Catalog.
- catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name());
- } catch (CatalogException e) {
- // Ignore.
- }
- }
-
- /**
- * Executes all async refresh work for the specified table name.
- */
- private void execAsyncRefreshWork(TTableName tblName) {
- if (!waitForCacheDirs(tblName)) return;
- try {
- // Reload the table metadata to pickup the new cached block location information.
- catalog_.reloadTable(tblName);
- } catch (CatalogException e) {
- LOG.error("Error reloading cached table: ", e);
- }
- }
-
- /**
- * Waits for all pending cache directives on a table to complete.
- * Returns true if a refresh is needed and false if a refresh is not needed.
- */
- private boolean waitForCacheDirs(TTableName tblName) {
- boolean isRefreshNeeded = false;
- // Keep processing cache directives for this table until there are none left.
- while (true) {
- // Get all pending requests for this table.
- List<Long> cacheDirIds = null;
- synchronized (pendingTableCacheDirs_) {
- cacheDirIds = pendingTableCacheDirs_.remove(tblName);
- }
- if (cacheDirIds == null || cacheDirIds.size() == 0) return isRefreshNeeded;
- isRefreshNeeded = true;
-
- // Wait for each cache request to complete.
- for (Long dirId: cacheDirIds) {
- if (dirId == null) continue;
- try {
- HdfsCachingUtil.waitForDirective(dirId);
- } catch (Exception e) {
- LOG.error(String.format(
- "Error waiting for cache request %d to complete: ", dirId), e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/TableNotFoundException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableNotFoundException.java b/fe/src/main/java/com/cloudera/impala/catalog/TableNotFoundException.java
deleted file mode 100644
index 26686d1..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/TableNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-
-/**
- * Thrown when a table cannot be found in the catalog.
- */
-public class TableNotFoundException extends CatalogException {
- // Dummy serial UID to avoid Eclipse warnings
- private static final long serialVersionUID = -2203080667446640542L;
-
- public TableNotFoundException(String s) { super(s); }
-
- public TableNotFoundException(String s, Exception cause) { super(s, cause); }
-}
\ No newline at end of file