You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:40 UTC
[23/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java b/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java
deleted file mode 100644
index 242c704..0000000
--- a/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java
+++ /dev/null
@@ -1,643 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.hive.executor;
-
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-
-import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.catalog.PrimitiveType;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.common.JniUtil;
-import com.cloudera.impala.thrift.THiveUdfExecutorCtorParams;
-import com.cloudera.impala.thrift.TPrimitiveType;
-import com.cloudera.impala.util.UnsafeUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-// Wrapper object to run hive UDFs. This class works with UdfCallExpr in the
-// backend to marshall data back and forth between the execution engine and
-// the java UDF class.
-// See the comments in be/src/exprs/hive-udf-call.h for more details.
-// TODO: should we cache loaded jars and classes?
-@SuppressWarnings("restriction")
-public class UdfExecutor {
- private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
- // By convention, the function in the class must be called evaluate()
- public static final String UDF_FUNCTION_NAME = "evaluate";
-
- // Object to deserialize ctor params from BE.
- private final static TBinaryProtocol.Factory PROTOCOL_FACTORY =
- new TBinaryProtocol.Factory();
-
- private UDF udf_;
- private Method method_;
-
- // Return and argument types of the function inferred from the udf method signature.
- // The JavaUdfDataType enum maps it to corresponding primitive type.
- private JavaUdfDataType[] argTypes_;
- private JavaUdfDataType retType_;
-
- // Input buffer from the backend. This is valid for the duration of an evaluate() call.
- // These buffers are allocated in the BE.
- private final long inputBufferPtr_;
- private final long inputNullsPtr_;
-
- // This is the byte offset in inputBufferPtr to the start of the input argument.
- // e.g. *inputBufferPtr_[inputBufferOffsets[i]] is the ith input argument.
- private final int[] inputBufferOffsets_;
-
- // Output buffer to return non-string values. This buffers are allocated in the BE.
- private final long outputBufferPtr_;
- private final long outputNullPtr_;
-
- // For StringValue return types, outputBufferPtr_ is the location of the 16-byte
- // StringValue object. StringValue.ptr is set to outBufferStringPtr_. This buffer
- // grows as necessary to fit the return string.
- // This is allocated from the FE.
- private long outBufferStringPtr_;
-
- // Size of outBufferStringPtr_.
- private int outBufferCapacity_;
-
- // Preconstructed input objects for the UDF. This minimizes object creation overhead
- // as these objects are reused across calls to evaluate().
- private Object[] inputObjects_;
- private Object[] inputArgs_; // inputArgs_[i] is either inputObjects_[i] or null
-
- // Data types that are supported as return or argument types in Java UDFs.
- public enum JavaUdfDataType {
- INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE),
- BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN),
- BOOLEAN_WRITABLE("BOOLEAN_WRITABLE", TPrimitiveType.BOOLEAN),
- TINYINT("TINYINT", TPrimitiveType.TINYINT),
- BYTE_WRITABLE("BYTE_WRITABLE", TPrimitiveType.TINYINT),
- SMALLINT("SMALLINT", TPrimitiveType.SMALLINT),
- SHORT_WRITABLE("SHORT_WRITABLE", TPrimitiveType.SMALLINT),
- INT("INT", TPrimitiveType.INT),
- INT_WRITABLE("INT_WRITABLE", TPrimitiveType.INT),
- BIGINT("BIGINT", TPrimitiveType.BIGINT),
- LONG_WRITABLE("LONG_WRITABLE", TPrimitiveType.BIGINT),
- FLOAT("FLOAT", TPrimitiveType.FLOAT),
- FLOAT_WRITABLE("FLOAT_WRITABLE", TPrimitiveType.FLOAT),
- DOUBLE("DOUBLE", TPrimitiveType.DOUBLE),
- DOUBLE_WRITABLE("DOUBLE", TPrimitiveType.DOUBLE),
- STRING("STRING", TPrimitiveType.STRING),
- TEXT("TEXT", TPrimitiveType.STRING),
- BYTES_WRITABLE("BYTES_WRITABLE", TPrimitiveType.STRING),
- BYTE_ARRAY("BYTE_ARRAY", TPrimitiveType.STRING);
-
- private final String description_;
- private final TPrimitiveType thriftType_;
-
- private JavaUdfDataType(String description, TPrimitiveType thriftType) {
- description_ = description;
- thriftType_ = thriftType;
- }
-
- @Override
- public String toString() { return description_; }
-
- public TPrimitiveType getPrimitiveType() { return thriftType_; }
-
- public static JavaUdfDataType getType(Class<?> c) {
- if (c == BooleanWritable.class) {
- return JavaUdfDataType.BOOLEAN_WRITABLE;
- } else if (c == boolean.class || c == Boolean.class) {
- return JavaUdfDataType.BOOLEAN;
- } else if (c == ByteWritable.class) {
- return JavaUdfDataType.BYTE_WRITABLE;
- } else if (c == byte.class || c == Byte.class) {
- return JavaUdfDataType.TINYINT;
- } else if (c == ShortWritable.class) {
- return JavaUdfDataType.SHORT_WRITABLE;
- } else if (c == short.class || c == Short.class) {
- return JavaUdfDataType.SMALLINT;
- } else if (c == IntWritable.class) {
- return JavaUdfDataType.INT_WRITABLE;
- } else if (c == int.class || c == Integer.class) {
- return JavaUdfDataType.INT;
- } else if (c == LongWritable.class) {
- return JavaUdfDataType.LONG_WRITABLE;
- } else if (c == long.class || c == Long.class) {
- return JavaUdfDataType.BIGINT;
- } else if (c == FloatWritable.class) {
- return JavaUdfDataType.FLOAT_WRITABLE;
- } else if (c == float.class || c == Float.class) {
- return JavaUdfDataType.FLOAT;
- } else if (c == DoubleWritable.class) {
- return JavaUdfDataType.DOUBLE_WRITABLE;
- } else if (c == double.class || c == Double.class) {
- return JavaUdfDataType.DOUBLE;
- } else if (c == byte[].class) {
- return JavaUdfDataType.BYTE_ARRAY;
- } else if (c == BytesWritable.class) {
- return JavaUdfDataType.BYTES_WRITABLE;
- } else if (c == Text.class) {
- return JavaUdfDataType.TEXT;
- } else if (c == String.class) {
- return JavaUdfDataType.STRING;
- }
- return JavaUdfDataType.INVALID_TYPE;
- }
-
- public static boolean isSupported(Type t) {
- for(JavaUdfDataType javaType: JavaUdfDataType.values()) {
- if (javaType == JavaUdfDataType.INVALID_TYPE) continue;
- if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) {
- return true;
- }
- }
- return false;
- }
- }
-
- /**
- * Create a UdfExecutor, using parameters from a serialized thrift object. Used by
- * the backend.
- */
- public UdfExecutor(byte[] thriftParams) throws ImpalaException {
- THiveUdfExecutorCtorParams request = new THiveUdfExecutorCtorParams();
- JniUtil.deserializeThrift(PROTOCOL_FACTORY, request, thriftParams);
-
- String className = request.fn.scalar_fn.symbol;
- String jarFile = request.local_location;
- Type retType = Type.fromThrift(request.fn.ret_type);
- Type[] parameterTypes = new Type[request.fn.arg_types.size()];
- for (int i = 0; i < request.fn.arg_types.size(); ++i) {
- parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
- }
- inputBufferPtr_ = request.input_buffer_ptr;
- inputNullsPtr_ = request.input_nulls_ptr;
- outputBufferPtr_ = request.output_buffer_ptr;
- outputNullPtr_ = request.output_null_ptr;
- outBufferStringPtr_ = 0;
- outBufferCapacity_ = 0;
- inputBufferOffsets_ = new int[request.input_byte_offsets.size()];
- for (int i = 0; i < request.input_byte_offsets.size(); ++i) {
- inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue();
- }
-
- init(jarFile, className, retType, parameterTypes);
- }
-
- @Override
- protected void finalize() throws Throwable {
- close();
- super.finalize();
- }
-
- /**
- * Releases any resources allocated off the native heap.
- */
- public void close() {
- UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_);
- outBufferStringPtr_ = 0;
- outBufferCapacity_ = 0;
- }
-
- /**
- * evaluate function called by the backend. The inputs to the UDF have
- * been serialized to 'input'
- */
- public void evaluate() throws ImpalaRuntimeException {
- try {
- for (int i = 0; i < argTypes_.length; ++i) {
- if (UnsafeUtil.UNSAFE.getByte(inputNullsPtr_ + i) == 0) {
- switch (argTypes_[i]) {
- case BOOLEAN_WRITABLE:
- case BYTE_WRITABLE:
- case SHORT_WRITABLE:
- case INT_WRITABLE:
- case LONG_WRITABLE:
- case FLOAT_WRITABLE:
- case DOUBLE_WRITABLE:
- case BYTE_ARRAY:
- case BYTES_WRITABLE:
- case TEXT:
- inputArgs_[i] = inputObjects_[i];
- break;
- case BOOLEAN:
- inputArgs_[i] = ((ImpalaBooleanWritable)inputObjects_[i]).get();
- break;
- case TINYINT:
- inputArgs_[i] = ((ImpalaTinyIntWritable)inputObjects_[i]).get();
- break;
- case SMALLINT:
- inputArgs_[i] = ((ImpalaSmallIntWritable)inputObjects_[i]).get();
- break;
- case INT:
- inputArgs_[i] = ((ImpalaIntWritable)inputObjects_[i]).get();
- break;
- case BIGINT:
- inputArgs_[i] = ((ImpalaBigIntWritable)inputObjects_[i]).get();
- break;
- case FLOAT:
- inputArgs_[i] = ((ImpalaFloatWritable)inputObjects_[i]).get();
- break;
- case DOUBLE:
- inputArgs_[i] = ((ImpalaDoubleWritable)inputObjects_[i]).get();
- break;
- case STRING:
- Preconditions.checkState(inputObjects_[i] instanceof ImpalaBytesWritable);
- inputArgs_[i] =
- new String(((ImpalaBytesWritable)inputObjects_[i]).getBytes());
- break;
- }
- } else {
- inputArgs_[i] = null;
- }
- }
- evaluate(inputArgs_);
- } catch (Exception e) {
- e.printStackTrace(System.err);
- throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
- }
- }
-
- /**
- * Evalutes the UDF with 'args' as the input to the UDF. This is exposed
- * for testing and not the version of evaluate() the backend uses.
- */
- public long evaluateForTesting(Object... args) throws ImpalaRuntimeException {
- try {
- Object[] inputArgs = new Object[args.length];
- for (int i = 0; i < args.length; ++i) {
- switch (argTypes_[i]) {
- case BOOLEAN_WRITABLE:
- case BYTE_WRITABLE:
- case SHORT_WRITABLE:
- case INT_WRITABLE:
- case LONG_WRITABLE:
- case FLOAT_WRITABLE:
- case DOUBLE_WRITABLE:
- case TEXT:
- case BYTE_ARRAY:
- case BYTES_WRITABLE:
- case STRING:
- inputArgs[i] = args[i];
- break;
- case BOOLEAN:
- inputArgs[i] = ((ImpalaBooleanWritable)args[i]).get();
- break;
- case TINYINT:
- inputArgs[i] = ((ImpalaTinyIntWritable)args[i]).get();
- break;
- case SMALLINT:
- inputArgs[i] = ((ImpalaSmallIntWritable)args[i]).get();
- break;
- case INT:
- inputArgs[i] = ((ImpalaIntWritable)args[i]).get();
- break;
- case BIGINT:
- inputArgs[i] = ((ImpalaBigIntWritable)args[i]).get();
- break;
- case FLOAT:
- inputArgs[i] = ((ImpalaFloatWritable)args[i]).get();
- break;
- case DOUBLE:
- inputArgs[i] = ((ImpalaDoubleWritable)args[i]).get();
- break;
- }
- }
- return evaluate(inputArgs);
- } catch (Exception e) {
- e.printStackTrace(System.err);
- throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
- }
- }
-
- /**
- * Evalutes the UDF with 'args' as the input to the UDF.
- * Returns 0 if the udf returned NULL. (the result is a ptr so this is okay).
- */
- private long evaluate(Object... args) throws ImpalaRuntimeException {
- try {
- storeUdfResult(method_.invoke(udf_, args));
- if (UnsafeUtil.UNSAFE.getByte(outputNullPtr_) == 1) return 0;
- return outputBufferPtr_;
- } catch (IllegalArgumentException e) {
- throw new ImpalaRuntimeException("UDF failed to evaluate", e);
- } catch (IllegalAccessException e) {
- throw new ImpalaRuntimeException("UDF failed to evaluate", e);
- } catch (InvocationTargetException e) {
- throw new ImpalaRuntimeException("UDF failed to evaluate", e);
- }
- }
-
- public Method getMethod() { return method_; }
-
- // Sets the result object 'obj' into the outputBufferPtr_
- private void storeUdfResult(Object obj) throws ImpalaRuntimeException {
- if (obj == null) {
- UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1);
- return;
- }
-
- UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0);
- switch (retType_) {
- case BOOLEAN_WRITABLE: {
- BooleanWritable val = (BooleanWritable)obj;
- UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0);
- return;
- }
- case BOOLEAN: {
- UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (boolean)obj ? (byte)1 : 0);
- return;
- }
- case BYTE_WRITABLE: {
- ByteWritable val = (ByteWritable)obj;
- UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get());
- return;
- }
- case TINYINT: {
- UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (byte)obj);
- return;
- }
- case SHORT_WRITABLE: {
- ShortWritable val = (ShortWritable)obj;
- UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get());
- return;
- }
- case SMALLINT: {
- UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, (short)obj);
- return;
- }
- case INT_WRITABLE: {
- IntWritable val = (IntWritable)obj;
- UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get());
- return;
- }
- case INT: {
- UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, (int)obj);
- return;
- }
- case LONG_WRITABLE: {
- LongWritable val = (LongWritable)obj;
- UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get());
- return;
- }
- case BIGINT: {
- UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, (long)obj);
- return;
- }
- case FLOAT_WRITABLE: {
- FloatWritable val = (FloatWritable)obj;
- UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get());
- return;
- }
- case FLOAT: {
- UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, (float)obj);
- return;
- }
- case DOUBLE_WRITABLE: {
- DoubleWritable val = (DoubleWritable)obj;
- UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get());
- return;
- }
- case DOUBLE: {
- UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, (double)obj);
- return;
- }
- case TEXT: {
- copyBytesToOutputBuffer(((Text)obj).copyBytes());
- return;
- }
- case BYTE_ARRAY: {
- copyBytesToOutputBuffer((byte[]) obj);
- return;
- }
- case BYTES_WRITABLE: {
- copyBytesToOutputBuffer(((BytesWritable)obj).copyBytes());
- return;
- }
- case STRING: {
- copyBytesToOutputBuffer(((String)obj).getBytes());
- return;
- }
- default:
- throw new ImpalaRuntimeException("Unsupported return type: " + retType_);
- }
- }
-
- private void copyBytesToOutputBuffer(byte[] bytes) {
- if (bytes.length > outBufferCapacity_) {
- outBufferStringPtr_ =
- UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length);
- outBufferCapacity_ = bytes.length;
- UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_);
- }
- UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length);
- UnsafeUtil.UNSAFE.putInt(
- outputBufferPtr_ + ImpalaStringWritable.STRING_VALUE_LEN_OFFSET,
- bytes.length);
- }
-
- // Preallocate the input objects that will be passed to the underlying UDF.
- // These objects are allocated once and reused across calls to evaluate()
- private void allocateInputObjects() throws ImpalaRuntimeException {
- inputObjects_ = new Writable[argTypes_.length];
- inputArgs_ = new Object[argTypes_.length];
-
- for (int i = 0; i < argTypes_.length; ++i) {
- int offset = inputBufferOffsets_[i];
- switch (argTypes_[i]) {
- case BOOLEAN:
- case BOOLEAN_WRITABLE:
- inputObjects_[i] = new ImpalaBooleanWritable(inputBufferPtr_ + offset);
- break;
- case TINYINT:
- case BYTE_WRITABLE:
- inputObjects_[i] = new ImpalaTinyIntWritable(inputBufferPtr_ + offset);
- break;
- case SMALLINT:
- case SHORT_WRITABLE:
- inputObjects_[i] = new ImpalaSmallIntWritable(inputBufferPtr_ + offset);
- break;
- case INT:
- case INT_WRITABLE:
- inputObjects_[i] = new ImpalaIntWritable(inputBufferPtr_ + offset);
- break;
- case BIGINT:
- case LONG_WRITABLE:
- inputObjects_[i] = new ImpalaBigIntWritable(inputBufferPtr_ + offset);
- break;
- case FLOAT:
- case FLOAT_WRITABLE:
- inputObjects_[i] = new ImpalaFloatWritable(inputBufferPtr_ + offset);
- break;
- case DOUBLE:
- case DOUBLE_WRITABLE:
- inputObjects_[i] = new ImpalaDoubleWritable(inputBufferPtr_ + offset);
- break;
- case TEXT:
- inputObjects_[i] = new ImpalaTextWritable(inputBufferPtr_ + offset);
- break;
- case BYTES_WRITABLE:
- inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
- break;
- case STRING:
- // String can be mapped to any String-like Writable class.
- inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
- break;
- default:
- throw new ImpalaRuntimeException("Unsupported argument type: " + argTypes_[i]);
- }
- }
- }
-
- private ClassLoader getClassLoader(String jarPath) throws MalformedURLException {
- if (jarPath == null) {
- return ClassLoader.getSystemClassLoader();
- } else {
- URL url = new File(jarPath).toURI().toURL();
- return URLClassLoader.newInstance(new URL[] { url }, getClass().getClassLoader());
- }
- }
-
- /**
- * Sets the return type of a Java UDF. Returns true if the return type is compatible
- * with the return type from the function definition. Throws an ImpalaRuntimeException
- * if the return type is not supported.
- */
- private boolean setReturnType(Type retType, Class<?> udfReturnType)
- throws ImpalaRuntimeException {
- if (!JavaUdfDataType.isSupported(retType)) {
- throw new ImpalaRuntimeException("Unsupported return type: " + retType.toSql());
- }
- JavaUdfDataType javaType = JavaUdfDataType.getType(udfReturnType);
- // Check if the evaluate method return type is compatible with the return type from
- // the function definition. This happens when both of them map to the same primitive
- // type.
- if (retType.getPrimitiveType().toThrift() != javaType.getPrimitiveType()) {
- return false;
- }
- retType_ = javaType;
- return true;
- }
-
- /**
- * Sets the argument types of a Java UDF. Returns true if the argument types specified
- * in the UDF are compatible with the argument types of the evaluate() function loaded
- * from the associated JAR file.
- */
- private boolean setArgTypes(Type[] parameterTypes, Class<?>[] udfArgTypes) {
- Preconditions.checkNotNull(argTypes_);
- for (int i = 0; i < udfArgTypes.length; ++i) {
- argTypes_[i] = JavaUdfDataType.getType(udfArgTypes[i]);
- if (argTypes_[i].getPrimitiveType()
- != parameterTypes[i].getPrimitiveType().toThrift()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Initializes the UdfExecutor validating the UDF has the proper signature.
- * This uses reflection to look up the "evaluate" function in the UDF class.
- */
- private void init(String jarPath, String udfPath,
- Type retType, Type... parameterTypes) throws
- ImpalaRuntimeException {
- ArrayList<String> signatures = Lists.newArrayList();
- try {
- LOG.debug("Loading UDF '" + udfPath + "' from " + jarPath);
- ClassLoader loader = getClassLoader(jarPath);
- Class<?> c = Class.forName(udfPath, true, loader);
- Class<? extends UDF> udfClass = c.asSubclass(UDF.class);
- Constructor<? extends UDF> ctor = udfClass.getConstructor();
- udf_ = ctor.newInstance();
- argTypes_ = new JavaUdfDataType[parameterTypes.length];
- Method[] methods = udfClass.getMethods();
- for (Method m: methods) {
- // By convention, the udf must contain the function "evaluate"
- if (!m.getName().equals(UDF_FUNCTION_NAME)) continue;
- signatures.add(m.toGenericString());
- Class<?>[] methodTypes = m.getParameterTypes();
-
- // Try to match the arguments
- if (methodTypes.length != parameterTypes.length) continue;
- if (methodTypes.length == 0 && parameterTypes.length == 0) {
- // Special case where the UDF doesn't take any input args
- method_ = m;
- if (!setReturnType(retType, m.getReturnType())) continue;
- setArgTypes(parameterTypes, methodTypes);
- LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
- return;
- }
-
- method_ = m;
- if (!setReturnType(retType, m.getReturnType())) continue;
- if (!setArgTypes(parameterTypes, methodTypes)) continue;
- allocateInputObjects();
- LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
- return;
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("Unable to find evaluate function with the correct signature: ")
- .append(udfPath + ".evaluate(")
- .append(Joiner.on(", ").join(parameterTypes))
- .append(")\n")
- .append("UDF contains: \n ")
- .append(Joiner.on("\n ").join(signatures));
- throw new ImpalaRuntimeException(sb.toString());
- } catch (MalformedURLException e) {
- throw new ImpalaRuntimeException("Unable load jar.", e);
- } catch (SecurityException e) {
- throw new ImpalaRuntimeException("Unable to load function.", e);
- } catch (ClassNotFoundException e) {
- throw new ImpalaRuntimeException("Unable to find class.", e);
- } catch (NoSuchMethodException e) {
- throw new ImpalaRuntimeException(
- "Unable to find constructor with no arguments.", e);
- } catch (IllegalArgumentException e) {
- throw new ImpalaRuntimeException(
- "Unable to call UDF constructor with no arguments.", e);
- } catch (InstantiationException e) {
- throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
- } catch (IllegalAccessException e) {
- throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
- } catch (InvocationTargetException e) {
- throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
deleted file mode 100644
index f6bf8a0..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AggregateInfo;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.FunctionCallExpr;
-import com.cloudera.impala.analysis.SlotId;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.thrift.TAggregationNode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TExpr;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Aggregation computation.
- *
- */
-public class AggregationNode extends PlanNode {
- private final static Logger LOG = LoggerFactory.getLogger(AggregationNode.class);
-
- // Default per-host memory requirement used if no valid stats are available.
- // TODO: Come up with a more useful heuristic.
- private final static long DEFAULT_PER_HOST_MEM = 128L * 1024L * 1024L;
-
- // Conservative minimum size of hash table for low-cardinality aggregations.
- private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
-
- private final AggregateInfo aggInfo_;
-
- // Set to true if this aggregation node needs to run the Finalize step. This
- // node is the root node of a distributed aggregation.
- private boolean needsFinalize_;
-
- // If true, use streaming preaggregation algorithm. Not valid if this is a merge agg.
- private boolean useStreamingPreagg_;
-
- /**
- * Create an agg node from aggInfo.
- */
- public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) {
- super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE");
- aggInfo_ = aggInfo;
- children_.add(input);
- needsFinalize_ = true;
- }
-
- /**
- * Copy c'tor used in clone().
- */
- private AggregationNode(PlanNodeId id, AggregationNode src) {
- super(id, src, "AGGREGATE");
- aggInfo_ = src.aggInfo_;
- needsFinalize_ = src.needsFinalize_;
- }
-
- public AggregateInfo getAggInfo() { return aggInfo_; }
-
- /**
- * Unsets this node as requiring finalize. Only valid to call this if it is
- * currently marked as needing finalize.
- */
- public void unsetNeedsFinalize() {
- Preconditions.checkState(needsFinalize_);
- needsFinalize_ = false;
- }
-
- /**
- * Sets this node as a preaggregation. Only valid to call this if it is not marked
- * as a preaggregation
- */
- public void setIsPreagg(PlannerContext ctx_) {
- TQueryOptions query_options = ctx_.getQueryOptions();
- useStreamingPreagg_ = !query_options.disable_streaming_preaggregations &&
- aggInfo_.getGroupingExprs().size() > 0;
- }
-
- /**
- * Have this node materialize the aggregation's intermediate tuple instead of
- * the output tuple.
- */
- public void setIntermediateTuple() {
- Preconditions.checkState(!tupleIds_.isEmpty());
- Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId()));
- tupleIds_.clear();
- tupleIds_.add(aggInfo_.getIntermediateTupleId());
- }
-
- @Override
- public boolean isBlockingNode() { return !useStreamingPreagg_; }
-
- @Override
- public void init(Analyzer analyzer) throws InternalException {
- // Assign predicates to the top-most agg in the single-node plan that can evaluate
- // them, as follows: For non-distinct aggs place them in the 1st phase agg node. For
- // distinct aggs place them in the 2nd phase agg node. The conjuncts are
- // transferred to the proper place in the multi-node plan via transferConjuncts().
- if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && !aggInfo_.isMerge()) {
- // Ignore predicates bound by a grouping slot produced by a SlotRef grouping expr.
- // Those predicates are already evaluated below this agg node (e.g., in a scan),
- // because the grouping slot must be in the same equivalence class as another slot
- // below this agg node. We must not ignore other grouping slots in order to retain
- // conjuncts bound by those grouping slots in createEquivConjuncts() (IMPALA-2089).
- // Those conjuncts cannot be redundant because our equivalence classes do not
- // capture dependencies with non-SlotRef exprs.
- Set<SlotId> groupBySlots = Sets.newHashSet();
- for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) {
- if (aggInfo_.getGroupingExprs().get(i).unwrapSlotRef(true) == null) continue;
- groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId());
- }
- ArrayList<Expr> bindingPredicates =
- analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true);
- conjuncts_.addAll(bindingPredicates);
-
- // also add remaining unassigned conjuncts_
- assignConjuncts(analyzer);
-
- analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, groupBySlots);
- }
- conjuncts_ = orderConjunctsByCost(conjuncts_);
- // Compute the mem layout for both tuples here for simplicity.
- aggInfo_.getOutputTupleDesc().computeMemLayout();
- aggInfo_.getIntermediateTupleDesc().computeMemLayout();
-
- // do this at the end so it can take all conjuncts into account
- computeStats(analyzer);
-
- // don't call createDefaultSMap(), it would point our conjuncts (= Having clause)
- // to our input; our conjuncts don't get substituted because they already
- // refer to our output
- outputSmap_ = getCombinedChildSmap();
- aggInfo_.substitute(outputSmap_, analyzer);
- // assert consistent aggregate expr and slot materialization
- aggInfo_.checkConsistency();
- }
-
- @Override
- public void computeStats(Analyzer analyzer) {
- super.computeStats(analyzer);
- // This is prone to overflow, because we keep multiplying cardinalities,
- // even if the grouping exprs are functionally dependent (example:
- // group by the primary key of a table plus a number of other columns from that
- // same table)
- // TODO: try to recognize functional dependencies
- // TODO: as a shortcut, instead of recognizing functional dependencies,
- // limit the contribution of a single table to the number of rows
- // of that table (so that when we're grouping by the primary key col plus
- // some others, the estimate doesn't overshoot dramatically)
- // cardinality: product of # of distinct values produced by grouping exprs
-
- // Any non-grouping aggregation has at least one distinct value
- cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 :
- Expr.getNumDistinctValues(aggInfo_.getGroupingExprs());
- // take HAVING predicate into account
- LOG.trace("Agg: cardinality=" + Long.toString(cardinality_));
- if (cardinality_ > 0) {
- cardinality_ = Math.round((double) cardinality_ * computeSelectivity());
- LOG.trace("sel=" + Double.toString(computeSelectivity()));
- }
- // if we ended up with an overflow, the estimate is certain to be wrong
- if (cardinality_ < 0) cardinality_ = -1;
- // Sanity check the cardinality_ based on the input cardinality_.
- if (getChild(0).getCardinality() != -1) {
- if (cardinality_ == -1) {
- // A worst-case cardinality_ is better than an unknown cardinality_.
- cardinality_ = getChild(0).getCardinality();
- } else {
- // An AggregationNode cannot increase the cardinality_.
- cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_);
- }
- }
- cardinality_ = capAtLimit(cardinality_);
- LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_));
- }
-
- @Override
- protected String debugString() {
- return Objects.toStringHelper(this)
- .add("aggInfo", aggInfo_.debugString())
- .addValue(super.debugString())
- .toString();
- }
-
- @Override
- protected void toThrift(TPlanNode msg) {
- msg.node_type = TPlanNodeType.AGGREGATION_NODE;
-
- List<TExpr> aggregateFunctions = Lists.newArrayList();
- // only serialize agg exprs that are being materialized
- for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) {
- aggregateFunctions.add(e.treeToThrift());
- }
- aggInfo_.checkConsistency();
- msg.agg_node = new TAggregationNode(
- aggregateFunctions,
- aggInfo_.getIntermediateTupleId().asInt(),
- aggInfo_.getOutputTupleId().asInt(), needsFinalize_,
- useStreamingPreagg_,
- getChild(0).getCardinality());
- List<Expr> groupingExprs = aggInfo_.getGroupingExprs();
- if (groupingExprs != null) {
- msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
- }
- }
-
- @Override
- protected String getDisplayLabelDetail() {
- if (useStreamingPreagg_) return "STREAMING";
- if (needsFinalize_) return "FINALIZE";
- return null;
- }
-
- @Override
- protected String getNodeExplainString(String prefix, String detailPrefix,
- TExplainLevel detailLevel) {
- StringBuilder output = new StringBuilder();
- String nameDetail = getDisplayLabelDetail();
- output.append(String.format("%s%s", prefix, getDisplayLabel()));
- if (nameDetail != null) output.append(" [" + nameDetail + "]");
- output.append("\n");
-
- if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
- ArrayList<FunctionCallExpr> aggExprs = aggInfo_.getMaterializedAggregateExprs();
- if (!aggExprs.isEmpty()) {
- output.append(detailPrefix + "output: ")
- .append(getExplainString(aggExprs) + "\n");
- }
- // TODO: is this the best way to display this. It currently would
- // have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not
- // very obvious what that means if you don't already know.
-
- // TODO: group by can be very long. Break it into multiple lines
- if (!aggInfo_.getGroupingExprs().isEmpty()) {
- output.append(detailPrefix + "group by: ")
- .append(getExplainString(aggInfo_.getGroupingExprs()) + "\n");
- }
- if (!conjuncts_.isEmpty()) {
- output.append(detailPrefix + "having: ")
- .append(getExplainString(conjuncts_) + "\n");
- }
- }
- return output.toString();
- }
-
- @Override
- public void computeCosts(TQueryOptions queryOptions) {
- Preconditions.checkNotNull(fragment_,
- "PlanNode must be placed into a fragment before calling this method.");
- perHostMemCost_ = 0;
- long perHostCardinality = fragment_.getNumDistinctValues(aggInfo_.getGroupingExprs());
- if (perHostCardinality == -1) {
- perHostMemCost_ = DEFAULT_PER_HOST_MEM;
- return;
- }
-
- // Per-host cardinality cannot be greater than the total output cardinality.
- if (cardinality_ != -1) {
- perHostCardinality = Math.min(perHostCardinality, cardinality_);
- }
- perHostMemCost_ += Math.max(perHostCardinality * avgRowSize_ *
- PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
deleted file mode 100644
index ccbdaa2..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
+++ /dev/null
@@ -1,249 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AnalyticWindow;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.OrderByElement;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.thrift.TAnalyticNode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Computation of analytic exprs.
- */
-public class AnalyticEvalNode extends PlanNode {
- private final static Logger LOG = LoggerFactory.getLogger(AnalyticEvalNode.class);
-
- private List<Expr> analyticFnCalls_;
-
- // Partitioning exprs from the AnalyticInfo
- private final List<Expr> partitionExprs_;
-
- // TODO: Remove when the BE uses partitionByLessThan rather than the exprs
- private List<Expr> substitutedPartitionExprs_;
- private List<OrderByElement> orderByElements_;
- private final AnalyticWindow analyticWindow_;
-
- // Physical tuples used/produced by this analytic node.
- private final TupleDescriptor intermediateTupleDesc_;
- private final TupleDescriptor outputTupleDesc_;
-
- // maps from the logical output slots in logicalTupleDesc_ to their corresponding
- // physical output slots in outputTupleDesc_
- private final ExprSubstitutionMap logicalToPhysicalSmap_;
-
- // predicates constructed from partitionExprs_/orderingExprs_ to
- // compare input to buffered tuples
- private final Expr partitionByEq_;
- private final Expr orderByEq_;
- private final TupleDescriptor bufferedTupleDesc_;
-
- public AnalyticEvalNode(
- PlanNodeId id, PlanNode input, List<Expr> analyticFnCalls,
- List<Expr> partitionExprs, List<OrderByElement> orderByElements,
- AnalyticWindow analyticWindow, TupleDescriptor intermediateTupleDesc,
- TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap,
- Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) {
- super(id, "ANALYTIC");
- Preconditions.checkState(!tupleIds_.contains(outputTupleDesc.getId()));
- analyticFnCalls_ = analyticFnCalls;
- partitionExprs_ = partitionExprs;
- orderByElements_ = orderByElements;
- analyticWindow_ = analyticWindow;
- intermediateTupleDesc_ = intermediateTupleDesc;
- outputTupleDesc_ = outputTupleDesc;
- logicalToPhysicalSmap_ = logicalToPhysicalSmap;
- partitionByEq_ = partitionByEq;
- orderByEq_ = orderByEq;
- bufferedTupleDesc_ = bufferedTupleDesc;
- children_.add(input);
- computeTupleIds();
- }
-
- @Override
- public void computeTupleIds() {
- clearTupleIds();
- tupleIds_.addAll(getChild(0).getTupleIds());
- // we're materializing the input row augmented with the analytic output tuple
- tupleIds_.add(outputTupleDesc_.getId());
- nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
- }
-
- @Override
- public boolean isBlockingNode() { return true; }
- public List<Expr> getPartitionExprs() { return partitionExprs_; }
- public List<OrderByElement> getOrderByElements() { return orderByElements_; }
-
- @Override
- public void init(Analyzer analyzer) {
- Preconditions.checkState(conjuncts_.isEmpty());
- computeMemLayout(analyzer);
- intermediateTupleDesc_.computeMemLayout();
-
- // we add the analyticInfo's smap to the combined smap of our child
- outputSmap_ = logicalToPhysicalSmap_;
- createDefaultSmap(analyzer);
-
- // Do not assign any conjuncts here: the conjuncts out of our SelectStmt's
- // Where clause have already been assigned, and conjuncts coming out of an
- // enclosing scope need to be evaluated *after* all analytic computations.
-
- // do this at the end so it can take all conjuncts into account
- computeStats(analyzer);
-
- LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
-
- // point fn calls, partition and ordering exprs at our input
- ExprSubstitutionMap childSmap = getCombinedChildSmap();
- analyticFnCalls_ = Expr.substituteList(analyticFnCalls_, childSmap, analyzer, false);
- substitutedPartitionExprs_ = Expr.substituteList(partitionExprs_, childSmap,
- analyzer, false);
- orderByElements_ = OrderByElement.substitute(orderByElements_, childSmap, analyzer);
- LOG.trace("evalnode: " + debugString());
- }
-
- @Override
- protected void computeStats(Analyzer analyzer) {
- super.computeStats(analyzer);
- cardinality_ = getChild(0).cardinality_;
- cardinality_ = capAtLimit(cardinality_);
- }
-
- @Override
- protected String debugString() {
- List<String> orderByElementStrs = Lists.newArrayList();
- for (OrderByElement element: orderByElements_) {
- orderByElementStrs.add(element.toSql());
- }
- return Objects.toStringHelper(this)
- .add("analyticFnCalls", Expr.debugString(analyticFnCalls_))
- .add("partitionExprs", Expr.debugString(partitionExprs_))
- .add("subtitutedPartitionExprs", Expr.debugString(substitutedPartitionExprs_))
- .add("orderByElements", Joiner.on(", ").join(orderByElementStrs))
- .add("window", analyticWindow_)
- .add("intermediateTid", intermediateTupleDesc_.getId())
- .add("outputTid", outputTupleDesc_.getId())
- .add("partitionByEq",
- partitionByEq_ != null ? partitionByEq_.debugString() : "null")
- .add("orderByEq",
- orderByEq_ != null ? orderByEq_.debugString() : "null")
- .addValue(super.debugString())
- .toString();
- }
-
- @Override
- protected void toThrift(TPlanNode msg) {
- msg.node_type = TPlanNodeType.ANALYTIC_EVAL_NODE;
- msg.analytic_node = new TAnalyticNode();
- msg.analytic_node.setIntermediate_tuple_id(intermediateTupleDesc_.getId().asInt());
- msg.analytic_node.setOutput_tuple_id(outputTupleDesc_.getId().asInt());
- msg.analytic_node.setPartition_exprs(Expr.treesToThrift(substitutedPartitionExprs_));
- msg.analytic_node.setOrder_by_exprs(
- Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements_)));
- msg.analytic_node.setAnalytic_functions(Expr.treesToThrift(analyticFnCalls_));
- if (analyticWindow_ == null) {
- if (!orderByElements_.isEmpty()) {
- msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift());
- }
- } else {
- // TODO: Window boundaries should have range_offset_predicate set
- msg.analytic_node.setWindow(analyticWindow_.toThrift());
- }
- if (partitionByEq_ != null) {
- msg.analytic_node.setPartition_by_eq(partitionByEq_.treeToThrift());
- }
- if (orderByEq_ != null) {
- msg.analytic_node.setOrder_by_eq(orderByEq_.treeToThrift());
- }
- if (bufferedTupleDesc_ != null) {
- msg.analytic_node.setBuffered_tuple_id(bufferedTupleDesc_.getId().asInt());
- }
- }
-
- @Override
- protected String getNodeExplainString(String prefix, String detailPrefix,
- TExplainLevel detailLevel) {
- StringBuilder output = new StringBuilder();
- output.append(String.format("%s%s", prefix, getDisplayLabel()));
- output.append("\n");
- if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
- output.append(detailPrefix + "functions: ");
- List<String> strings = Lists.newArrayList();
- for (Expr fnCall: analyticFnCalls_) {
- strings.add(fnCall.toSql());
- }
- output.append(Joiner.on(", ").join(strings));
- output.append("\n");
-
- if (!partitionExprs_.isEmpty()) {
- output.append(detailPrefix + "partition by: ");
- strings.clear();
- for (Expr partitionExpr: partitionExprs_) {
- strings.add(partitionExpr.toSql());
- }
- output.append(Joiner.on(", ").join(strings));
- output.append("\n");
- }
-
- if (!orderByElements_.isEmpty()) {
- output.append(detailPrefix + "order by: ");
- strings.clear();
- for (OrderByElement element: orderByElements_) {
- strings.add(element.toSql());
- }
- output.append(Joiner.on(", ").join(strings));
- output.append("\n");
- }
-
- if (analyticWindow_ != null) {
- output.append(detailPrefix + "window: ");
- output.append(analyticWindow_.toSql());
- output.append("\n");
- }
-
- if (!conjuncts_.isEmpty()) {
- output.append(
- detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
- }
- }
- return output.toString();
- }
-
- @Override
- public void computeCosts(TQueryOptions queryOptions) {
- Preconditions.checkNotNull(fragment_,
- "PlanNode must be placed into a fragment before calling this method.");
- // TODO: come up with estimate based on window
- perHostMemCost_ = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
deleted file mode 100644
index c02096e..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
+++ /dev/null
@@ -1,815 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AggregateInfoBase;
-import com.cloudera.impala.analysis.AnalyticExpr;
-import com.cloudera.impala.analysis.AnalyticInfo;
-import com.cloudera.impala.analysis.AnalyticWindow;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.BoolLiteral;
-import com.cloudera.impala.analysis.CompoundPredicate;
-import com.cloudera.impala.analysis.CompoundPredicate.Operator;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.IsNullPredicate;
-import com.cloudera.impala.analysis.OrderByElement;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.analysis.SortInfo;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.analysis.TupleId;
-import com.cloudera.impala.analysis.TupleIsNullPredicate;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.thrift.TPartitionType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-
-/**
- * The analytic planner adds plan nodes to an existing plan tree in order to
- * implement the AnalyticInfo of a given query stmt. The resulting plan reflects
- * similarities among analytic exprs with respect to partitioning, ordering and
- * windowing to reduce data exchanges and sorts (the exchanges and sorts are currently
- * not minimal). The generated plan has the following structure:
- * ...
- * (
- * (
- * (
- * analytic node <-- group of analytic exprs with compatible window
- * )+ <-- group of analytic exprs with compatible ordering
- * sort node?
- * )+ <-- group of analytic exprs with compatible partitioning
- * hash exchange?
- * )* <-- group of analytic exprs that have different partitioning
- * input plan node
- * ...
- */
-public class AnalyticPlanner {
- private final static Logger LOG = LoggerFactory.getLogger(AnalyticPlanner.class);
-
- private final AnalyticInfo analyticInfo_;
- private final Analyzer analyzer_;
- private final PlannerContext ctx_;
-
- public AnalyticPlanner(AnalyticInfo analyticInfo, Analyzer analyzer,
- PlannerContext ctx) {
- analyticInfo_ = analyticInfo;
- analyzer_ = analyzer;
- ctx_ = ctx;
- }
-
- /**
- * Return plan tree that augments 'root' with plan nodes that implement single-node
- * evaluation of the AnalyticExprs in analyticInfo.
- * This plan takes into account a possible hash partition of its input on
- * 'groupingExprs'; if this is non-null, it returns in 'inputPartitionExprs'
- * a subset of the grouping exprs which should be used for the aggregate
- * hash partitioning during the parallelization of 'root'.
- * TODO: when generating sort orders for the sort groups, optimize the ordering
- * of the partition exprs (so that subsequent sort operations see the input sorted
- * on a prefix of their required sort exprs)
- * TODO: when merging sort groups, recognize equivalent exprs
- * (using the equivalence classes) rather than looking for expr equality
- */
- public PlanNode createSingleNodePlan(PlanNode root,
- List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException {
- List<WindowGroup> windowGroups = collectWindowGroups();
- for (int i = 0; i < windowGroups.size(); ++i) {
- windowGroups.get(i).init(analyzer_, "wg-" + i);
- }
- List<SortGroup> sortGroups = collectSortGroups(windowGroups);
- mergeSortGroups(sortGroups);
- for (SortGroup g: sortGroups) {
- g.init();
- }
- List<PartitionGroup> partitionGroups = collectPartitionGroups(sortGroups);
- mergePartitionGroups(partitionGroups, root.getNumNodes());
- orderGroups(partitionGroups);
- if (groupingExprs != null) {
- Preconditions.checkNotNull(inputPartitionExprs);
- computeInputPartitionExprs(
- partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs);
- }
-
- for (PartitionGroup partitionGroup: partitionGroups) {
- for (int i = 0; i < partitionGroup.sortGroups.size(); ++i) {
- root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i),
- i == 0 ? partitionGroup.partitionByExprs : null);
- }
- }
-
- // create equiv classes for newly added slots
- analyzer_.createIdentityEquivClasses();
- return root;
- }
-
- /**
- * Coalesce sort groups that have compatible partition-by exprs and
- * have a prefix relationship.
- */
- private void mergeSortGroups(List<SortGroup> sortGroups) {
- boolean hasMerged = false;
- do {
- hasMerged = false;
- for (SortGroup sg1: sortGroups) {
- for (SortGroup sg2: sortGroups) {
- if (sg1 != sg2 && sg1.isPrefixOf(sg2)) {
- sg1.absorb(sg2);
- sortGroups.remove(sg2);
- hasMerged = true;
- break;
- }
- }
- if (hasMerged) break;
- }
- } while (hasMerged);
- }
-
- /**
- * Coalesce partition groups for which the intersection of their
- * partition exprs has ndv estimate > numNodes, so that the resulting plan
- * still parallelizes across all nodes.
- */
- private void mergePartitionGroups(
- List<PartitionGroup> partitionGroups, int numNodes) {
- boolean hasMerged = false;
- do {
- hasMerged = false;
- for (PartitionGroup pg1: partitionGroups) {
- for (PartitionGroup pg2: partitionGroups) {
- if (pg1 != pg2) {
- long ndv = Expr.getNumDistinctValues(
- Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
- if (ndv == -1 || ndv < 0 || ndv < numNodes) {
- // didn't get a usable value or the number of partitions is too small
- continue;
- }
- pg1.merge(pg2);
- partitionGroups.remove(pg2);
- hasMerged = true;
- break;
- }
- }
- if (hasMerged) break;
- }
- } while (hasMerged);
- }
-
- /**
- * Determine the partition group that has the maximum intersection in terms
- * of the estimated ndv of the partition exprs with groupingExprs.
- * That partition group is placed at the front of partitionGroups, with its
- * partition exprs reduced to the intersection, and the intersecting groupingExprs
- * are returned in inputPartitionExprs.
- */
- private void computeInputPartitionExprs(List<PartitionGroup> partitionGroups,
- List<Expr> groupingExprs, int numNodes, List<Expr> inputPartitionExprs) {
- inputPartitionExprs.clear();
- Preconditions.checkState(numNodes != -1);
- // find partition group with maximum intersection
- long maxNdv = 0;
- PartitionGroup maxPg = null;
- List<Expr> maxGroupingExprs = null;
- for (PartitionGroup pg: partitionGroups) {
- List<Expr> l1 = Lists.newArrayList();
- List<Expr> l2 = Lists.newArrayList();
- Expr.intersect(analyzer_, pg.partitionByExprs, groupingExprs,
- analyzer_.getEquivClassSmap(), l1, l2);
- // TODO: also look at l2 and take the max?
- long ndv = Expr.getNumDistinctValues(l1);
- if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
- // found a better partition group
- maxPg = pg;
- maxPg.partitionByExprs = l1;
- maxGroupingExprs = l2;
- maxNdv = ndv;
- }
-
- if (maxNdv > numNodes) {
- Preconditions.checkNotNull(maxPg);
- // we found a partition group that gives us enough parallelism;
- // move it to the front
- partitionGroups.remove(maxPg);
- partitionGroups.add(0, maxPg);
- inputPartitionExprs.addAll(maxGroupingExprs);
- }
- }
-
- /**
- * Order partition groups (and the sort groups within them) by increasing
- * totalOutputTupleSize. This minimizes the total volume of data that needs to be
- * repartitioned and sorted.
- * Also move the non-partitioning partition group to the end.
- */
- private void orderGroups(List<PartitionGroup> partitionGroups) {
- // remove the non-partitioning group from partitionGroups
- PartitionGroup nonPartitioning = null;
- for (PartitionGroup pg: partitionGroups) {
- if (pg.partitionByExprs.isEmpty()) {
- nonPartitioning = pg;
- break;
- }
- }
- if (nonPartitioning != null) partitionGroups.remove(nonPartitioning);
-
- // order by ascending combined output tuple size
- Collections.sort(partitionGroups,
- new Comparator<PartitionGroup>() {
- public int compare(PartitionGroup pg1, PartitionGroup pg2) {
- Preconditions.checkState(pg1.totalOutputTupleSize > 0);
- Preconditions.checkState(pg2.totalOutputTupleSize > 0);
- int diff = pg1.totalOutputTupleSize - pg2.totalOutputTupleSize;
- return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
- }
- });
- if (nonPartitioning != null) partitionGroups.add(nonPartitioning);
-
- for (PartitionGroup pg: partitionGroups) {
- pg.orderSortGroups();
- }
- }
-
- /**
- * Create SortInfo, including sort tuple, to sort entire input row
- * on sortExprs.
- */
- private SortInfo createSortInfo(
- PlanNode input, List<Expr> sortExprs, List<Boolean> isAsc,
- List<Boolean> nullsFirst) {
- // create tuple for sort output = the entire materialized input in a single tuple
- TupleDescriptor sortTupleDesc =
- analyzer_.getDescTbl().createTupleDescriptor("sort-tuple");
- ExprSubstitutionMap sortSmap = new ExprSubstitutionMap();
- List<Expr> sortSlotExprs = Lists.newArrayList();
- sortTupleDesc.setIsMaterialized(true);
- for (TupleId tid: input.getTupleIds()) {
- TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid);
- for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) {
- if (!inputSlotDesc.isMaterialized()) continue;
- SlotDescriptor sortSlotDesc =
- analyzer_.copySlotDescriptor(inputSlotDesc, sortTupleDesc);
- // all output slots need to be materialized
- sortSlotDesc.setIsMaterialized(true);
- sortSmap.put(new SlotRef(inputSlotDesc), new SlotRef(sortSlotDesc));
- sortSlotExprs.add(new SlotRef(inputSlotDesc));
- }
- }
-
- // Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains
- // TupleIsNullPredicates. TupleIsNullPredicates require specific tuple ids for
- // evaluation. Since this sort materializes a new tuple, it's impossible to evaluate
- // TupleIsNullPredicates referring to this sort's input after this sort,
- // To preserve the information whether an input tuple was null or not this sort node,
- // we materialize those rhs TupleIsNullPredicates, which are then substituted
- // by a SlotRef into the sort's tuple in ancestor nodes (IMPALA-1519).
- ExprSubstitutionMap inputSmap = input.getOutputSmap();
- if (inputSmap != null) {
- List<Expr> tupleIsNullPredsToMaterialize = Lists.newArrayList();
- for (int i = 0; i < inputSmap.size(); ++i) {
- Expr rhsExpr = inputSmap.getRhs().get(i);
- // Ignore substitutions that are irrelevant at this plan node and its ancestors.
- if (!rhsExpr.isBoundByTupleIds(input.getTupleIds())) continue;
- rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPredsToMaterialize);
- }
- Expr.removeDuplicates(tupleIsNullPredsToMaterialize);
-
- // Materialize relevant unique TupleIsNullPredicates.
- for (Expr tupleIsNullPred: tupleIsNullPredsToMaterialize) {
- SlotDescriptor sortSlotDesc = analyzer_.addSlotDescriptor(sortTupleDesc);
- sortSlotDesc.setType(tupleIsNullPred.getType());
- sortSlotDesc.setIsMaterialized(true);
- sortSlotDesc.setSourceExpr(tupleIsNullPred);
- sortSlotDesc.setLabel(tupleIsNullPred.toSql());
- sortSlotExprs.add(tupleIsNullPred.clone());
- }
- }
-
- SortInfo sortInfo = new SortInfo(
- Expr.substituteList(sortExprs, sortSmap, analyzer_, false), isAsc, nullsFirst);
- LOG.trace("sortinfo exprs: " + Expr.debugString(sortInfo.getOrderingExprs()));
- sortInfo.setMaterializedTupleInfo(sortTupleDesc, sortSlotExprs);
- return sortInfo;
- }
-
- /**
- * Create plan tree for the entire sort group, including all contained window groups.
- * Marks the SortNode as requiring its input to be partitioned if partitionExprs
- * is not null (partitionExprs represent the data partition of the entire partition
- * group of which this sort group is a part).
- */
- private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup,
- List<Expr> partitionExprs) throws ImpalaException {
- List<Expr> partitionByExprs = sortGroup.partitionByExprs;
- List<OrderByElement> orderByElements = sortGroup.orderByElements;
- ExprSubstitutionMap sortSmap = null;
- TupleId sortTupleId = null;
- TupleDescriptor bufferedTupleDesc = null;
- // map from input to buffered tuple
- ExprSubstitutionMap bufferedSmap = new ExprSubstitutionMap();
-
- // sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates
- if (!partitionByExprs.isEmpty() || !orderByElements.isEmpty()) {
- // first sort on partitionExprs (direction doesn't matter)
- List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
- List<Boolean> isAsc =
- Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
- // TODO: utilize a direction and nulls/first last that has benefit
- // for subsequent sort groups
- List<Boolean> nullsFirst =
- Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
-
- // then sort on orderByExprs
- for (OrderByElement orderByElement: sortGroup.orderByElements) {
- sortExprs.add(orderByElement.getExpr());
- isAsc.add(orderByElement.isAsc());
- nullsFirst.add(orderByElement.getNullsFirstParam());
- }
-
- SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
- SortNode sortNode = new SortNode(ctx_.getNextNodeId(), root, sortInfo, false, 0);
-
- // if this sort group does not have partitioning exprs, we want the sort
- // to be executed like a regular distributed sort
- if (!partitionByExprs.isEmpty()) sortNode.setIsAnalyticSort(true);
-
- if (partitionExprs != null) {
- // create required input partition
- DataPartition inputPartition = DataPartition.UNPARTITIONED;
- if (!partitionExprs.isEmpty()) {
- inputPartition = DataPartition.hashPartitioned(partitionExprs);
- }
- sortNode.setInputPartition(inputPartition);
- }
-
- root = sortNode;
- root.init(analyzer_);
- sortSmap = sortNode.getOutputSmap();
-
- // create bufferedTupleDesc and bufferedSmap
- sortTupleId = sortNode.tupleIds_.get(0);
- bufferedTupleDesc =
- analyzer_.getDescTbl().copyTupleDescriptor(sortTupleId, "buffered-tuple");
- LOG.trace("desctbl: " + analyzer_.getDescTbl().debugString());
-
- List<SlotDescriptor> inputSlots = analyzer_.getTupleDesc(sortTupleId).getSlots();
- List<SlotDescriptor> bufferedSlots = bufferedTupleDesc.getSlots();
- for (int i = 0; i < inputSlots.size(); ++i) {
- bufferedSmap.put(
- new SlotRef(inputSlots.get(i)), new SlotRef(bufferedSlots.get(i)));
- }
- }
-
- // create one AnalyticEvalNode per window group
- for (WindowGroup windowGroup: sortGroup.windowGroups) {
- // Create partition-by (pb) and order-by (ob) less-than predicates between the
- // input tuple (the output of the preceding sort) and a buffered tuple that is
- // identical to the input tuple. We need a different tuple descriptor for the
- // buffered tuple because the generated predicates should compare two different
- // tuple instances from the same input stream (i.e., the predicates should be
- // evaluated over a row that is composed of the input and the buffered tuple).
-
- // we need to remap the pb/ob exprs to a) the sort output, b) our buffer of the
- // sort input
- Expr partitionByEq = null;
- if (!windowGroup.partitionByExprs.isEmpty()) {
- partitionByEq = createNullMatchingEquals(
- Expr.substituteList(windowGroup.partitionByExprs, sortSmap, analyzer_, false),
- sortTupleId, bufferedSmap);
- LOG.trace("partitionByEq: " + partitionByEq.debugString());
- }
- Expr orderByEq = null;
- if (!windowGroup.orderByElements.isEmpty()) {
- orderByEq = createNullMatchingEquals(
- OrderByElement.getOrderByExprs(OrderByElement.substitute(
- windowGroup.orderByElements, sortSmap, analyzer_)),
- sortTupleId, bufferedSmap);
- LOG.trace("orderByEq: " + orderByEq.debugString());
- }
-
- root = new AnalyticEvalNode(ctx_.getNextNodeId(), root,
- windowGroup.analyticFnCalls, windowGroup.partitionByExprs,
- windowGroup.orderByElements, windowGroup.window,
- windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple,
- windowGroup.logicalToPhysicalSmap,
- partitionByEq, orderByEq, bufferedTupleDesc);
- root.init(analyzer_);
- }
- return root;
- }
-
- /**
- * Create a predicate that checks if all exprs are equal or both sides are null.
- */
- private Expr createNullMatchingEquals(List<Expr> exprs, TupleId inputTid,
- ExprSubstitutionMap bufferedSmap) {
- Preconditions.checkState(!exprs.isEmpty());
- Expr result = createNullMatchingEqualsAux(exprs, 0, inputTid, bufferedSmap);
- result.analyzeNoThrow(analyzer_);
- return result;
- }
-
- /**
- * Create an unanalyzed predicate that checks if elements >= i are equal or
- * both sides are null.
- *
- * The predicate has the form
- * ((lhs[i] is null && rhs[i] is null) || (
- * lhs[i] is not null && rhs[i] is not null && lhs[i] = rhs[i]))
- * && <createEqualsAux(i + 1)>
- */
- private Expr createNullMatchingEqualsAux(List<Expr> elements, int i,
- TupleId inputTid, ExprSubstitutionMap bufferedSmap) {
- if (i > elements.size() - 1) return new BoolLiteral(true);
-
- // compare elements[i]
- Expr lhs = elements.get(i);
- Preconditions.checkState(lhs.isBound(inputTid));
- Expr rhs = lhs.substitute(bufferedSmap, analyzer_, false);
-
- Expr bothNull = new CompoundPredicate(Operator.AND,
- new IsNullPredicate(lhs, false), new IsNullPredicate(rhs, false));
- Expr lhsEqRhsNotNull = new CompoundPredicate(Operator.AND,
- new CompoundPredicate(Operator.AND,
- new IsNullPredicate(lhs, true), new IsNullPredicate(rhs, true)),
- new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs));
- Expr remainder = createNullMatchingEqualsAux(elements, i + 1, inputTid, bufferedSmap);
- return new CompoundPredicate(CompoundPredicate.Operator.AND,
- new CompoundPredicate(Operator.OR, bothNull, lhsEqRhsNotNull),
- remainder);
- }
-
- /**
- * Collection of AnalyticExprs that share the same partition-by/order-by/window
- * specification. The AnalyticExprs are stored broken up into their constituent parts.
- */
- private static class WindowGroup {
- public final List<Expr> partitionByExprs;
- public final List<OrderByElement> orderByElements;
- public final AnalyticWindow window; // not null
-
- // Analytic exprs belonging to this window group and their corresponding logical
- // intermediate and output slots from AnalyticInfo.intermediateTupleDesc_
- // and AnalyticInfo.outputTupleDesc_.
- public final List<AnalyticExpr> analyticExprs = Lists.newArrayList();
- // Result of getFnCall() for every analytic expr.
- public final List<Expr> analyticFnCalls = Lists.newArrayList();
- public final List<SlotDescriptor> logicalOutputSlots = Lists.newArrayList();
- public final List<SlotDescriptor> logicalIntermediateSlots = Lists.newArrayList();
-
- // Physical output and intermediate tuples as well as an smap that maps the
- // corresponding logical output slots to their physical slots in physicalOutputTuple.
- // Set in init().
- public TupleDescriptor physicalOutputTuple;
- public TupleDescriptor physicalIntermediateTuple;
- public final ExprSubstitutionMap logicalToPhysicalSmap = new ExprSubstitutionMap();
-
- public WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
- SlotDescriptor logicalIntermediateSlot) {
- partitionByExprs = analyticExpr.getPartitionExprs();
- orderByElements = analyticExpr.getOrderByElements();
- window = analyticExpr.getWindow();
- analyticExprs.add(analyticExpr);
- analyticFnCalls.add(analyticExpr.getFnCall());
- logicalOutputSlots.add(logicalOutputSlot);
- logicalIntermediateSlots.add(logicalIntermediateSlot);
- }
-
- /**
- * True if this analytic function must be evaluated in its own WindowGroup.
- */
- private static boolean requiresIndependentEval(AnalyticExpr analyticExpr) {
- return analyticExpr.getFnCall().getFnName().getFunction().equals(
- AnalyticExpr.FIRST_VALUE_REWRITE);
- }
-
- /**
- * True if the partition exprs and ordering elements and the window of analyticExpr
- * match ours.
- */
- public boolean isCompatible(AnalyticExpr analyticExpr) {
- if (requiresIndependentEval(analyticExprs.get(0)) ||
- requiresIndependentEval(analyticExpr)) {
- return false;
- }
-
- if (!Expr.equalSets(analyticExpr.getPartitionExprs(), partitionByExprs)) {
- return false;
- }
- if (!analyticExpr.getOrderByElements().equals(orderByElements)) return false;
- if ((window == null) != (analyticExpr.getWindow() == null)) return false;
- if (window == null) return true;
- return analyticExpr.getWindow().equals(window);
- }
-
- /**
- * Adds the given analytic expr and its logical slots to this window group.
- * Assumes the corresponding analyticExpr is compatible with 'this'.
- */
- public void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
- SlotDescriptor logicalIntermediateSlot) {
- Preconditions.checkState(isCompatible(analyticExpr));
- analyticExprs.add(analyticExpr);
- analyticFnCalls.add(analyticExpr.getFnCall());
- logicalOutputSlots.add(logicalOutputSlot);
- logicalIntermediateSlots.add(logicalIntermediateSlot);
- }
-
- /**
- * Creates the physical output and intermediate tuples as well as the logical to
- * physical smap for this window group. Computes the mem layout for the tuple
- * descriptors.
- */
- public void init(Analyzer analyzer, String tupleName) {
- Preconditions.checkState(physicalOutputTuple == null);
- Preconditions.checkState(physicalIntermediateTuple == null);
- Preconditions.checkState(analyticFnCalls.size() == analyticExprs.size());
-
- // If needed, create the intermediate tuple first to maintain
- // intermediateTupleId < outputTupleId for debugging purposes and consistency with
- // tuple creation for aggregations.
- boolean requiresIntermediateTuple =
- AggregateInfoBase.requiresIntermediateTuple(analyticFnCalls);
- if (requiresIntermediateTuple) {
- physicalIntermediateTuple =
- analyzer.getDescTbl().createTupleDescriptor(tupleName + "intermed");
- physicalOutputTuple =
- analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
- } else {
- physicalOutputTuple =
- analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
- physicalIntermediateTuple = physicalOutputTuple;
- }
-
- Preconditions.checkState(analyticExprs.size() == logicalIntermediateSlots.size());
- Preconditions.checkState(analyticExprs.size() == logicalOutputSlots.size());
- for (int i = 0; i < analyticExprs.size(); ++i) {
- SlotDescriptor logicalOutputSlot = logicalOutputSlots.get(i);
- SlotDescriptor physicalOutputSlot =
- analyzer.copySlotDescriptor(logicalOutputSlot, physicalOutputTuple);
- physicalOutputSlot.setIsMaterialized(true);
- if (requiresIntermediateTuple) {
- SlotDescriptor logicalIntermediateSlot = logicalIntermediateSlots.get(i);
- SlotDescriptor physicalIntermediateSlot = analyzer.copySlotDescriptor(
- logicalIntermediateSlot, physicalIntermediateTuple);
- physicalIntermediateSlot.setIsMaterialized(true);
- }
- logicalToPhysicalSmap.put(
- new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot));
- }
- physicalOutputTuple.computeMemLayout();
- if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
- }
- }
-
- /**
- * Extract a minimal set of WindowGroups from analyticExprs.
- */
- private List<WindowGroup> collectWindowGroups() {
- List<Expr> analyticExprs = analyticInfo_.getAnalyticExprs();
- List<WindowGroup> groups = Lists.newArrayList();
- for (int i = 0; i < analyticExprs.size(); ++i) {
- AnalyticExpr analyticExpr = (AnalyticExpr) analyticExprs.get(i);
- // Do not generate the plan for non-materialized analytic exprs.
- if (!analyticInfo_.getOutputTupleDesc().getSlots().get(i).isMaterialized()) {
- continue;
- }
- boolean match = false;
- for (WindowGroup group: groups) {
- if (group.isCompatible(analyticExpr)) {
- group.add((AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
- analyticInfo_.getOutputTupleDesc().getSlots().get(i),
- analyticInfo_.getIntermediateTupleDesc().getSlots().get(i));
- match = true;
- break;
- }
- }
- if (!match) {
- groups.add(new WindowGroup(
- (AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
- analyticInfo_.getOutputTupleDesc().getSlots().get(i),
- analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)));
- }
- }
- return groups;
- }
-
- /**
- * Collection of WindowGroups that share the same partition-by/order-by
- * specification.
- */
- private static class SortGroup {
- public List<Expr> partitionByExprs;
- public List<OrderByElement> orderByElements;
- public List<WindowGroup> windowGroups = Lists.newArrayList();
-
- // sum of windowGroups.physicalOutputTuple.getByteSize()
- public int totalOutputTupleSize = -1;
-
- public SortGroup(WindowGroup windowGroup) {
- partitionByExprs = windowGroup.partitionByExprs;
- orderByElements = windowGroup.orderByElements;
- windowGroups.add(windowGroup);
- }
-
- /**
- * True if the partition and ordering exprs of windowGroup match ours.
- */
- public boolean isCompatible(WindowGroup windowGroup) {
- return Expr.equalSets(windowGroup.partitionByExprs, partitionByExprs)
- && windowGroup.orderByElements.equals(orderByElements);
- }
-
- public void add(WindowGroup windowGroup) {
- Preconditions.checkState(isCompatible(windowGroup));
- windowGroups.add(windowGroup);
- }
-
- /**
- * Return true if 'this' and other have compatible partition exprs and
- * our orderByElements are a prefix of other's.
- */
- public boolean isPrefixOf(SortGroup other) {
- if (other.orderByElements.size() > orderByElements.size()) return false;
- if (!Expr.equalSets(partitionByExprs, other.partitionByExprs)) return false;
- for (int i = 0; i < other.orderByElements.size(); ++i) {
- OrderByElement ob = orderByElements.get(i);
- OrderByElement otherOb = other.orderByElements.get(i);
- // TODO: compare equiv classes by comparing each equiv class's placeholder
- // slotref
- if (!ob.getExpr().equals(otherOb.getExpr())) return false;
- if (ob.isAsc() != otherOb.isAsc()) return false;
- if (ob.nullsFirst() != otherOb.nullsFirst()) return false;
- }
- return true;
- }
-
- /**
- * Adds other's window groups to ours, assuming that we're a prefix of other.
- */
- public void absorb(SortGroup other) {
- Preconditions.checkState(isPrefixOf(other));
- windowGroups.addAll(other.windowGroups);
- }
-
- /**
- * Compute totalOutputTupleSize.
- */
- public void init() {
- totalOutputTupleSize = 0;
- for (WindowGroup g: windowGroups) {
- TupleDescriptor outputTuple = g.physicalOutputTuple;
- Preconditions.checkState(outputTuple.isMaterialized());
- Preconditions.checkState(outputTuple.getByteSize() != -1);
- totalOutputTupleSize += outputTuple.getByteSize();
- }
- }
-
- private static class SizeLt implements Comparator<WindowGroup> {
- public int compare(WindowGroup wg1, WindowGroup wg2) {
- Preconditions.checkState(wg1.physicalOutputTuple != null
- && wg1.physicalOutputTuple.getByteSize() != -1);
- Preconditions.checkState(wg2.physicalOutputTuple != null
- && wg2.physicalOutputTuple.getByteSize() != -1);
- int diff = wg1.physicalOutputTuple.getByteSize()
- - wg2.physicalOutputTuple.getByteSize();
- return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
- }
- }
-
- private static final SizeLt SIZE_LT;
- static {
- SIZE_LT = new SizeLt();
- }
-
- /**
- * Order window groups by increasing size of the output tuple. This minimizes
- * the total volume of data that needs to be buffered.
- */
- public void orderWindowGroups() {
- Collections.sort(windowGroups, SIZE_LT);
- }
- }
-
- /**
- * Partitions the windowGroups into SortGroups based on compatible order by exprs.
- */
- private List<SortGroup> collectSortGroups(List<WindowGroup> windowGroups) {
- List<SortGroup> sortGroups = Lists.newArrayList();
- for (WindowGroup windowGroup: windowGroups) {
- boolean match = false;
- for (SortGroup sortGroup: sortGroups) {
- if (sortGroup.isCompatible(windowGroup)) {
- sortGroup.add(windowGroup);
- match = true;
- break;
- }
- }
- if (!match) sortGroups.add(new SortGroup(windowGroup));
- }
- return sortGroups;
- }
-
- /**
- * Collection of SortGroups that have compatible partition-by specifications.
- */
- private static class PartitionGroup {
- public List<Expr> partitionByExprs;
- public List<SortGroup> sortGroups = Lists.newArrayList();
-
- // sum of sortGroups.windowGroups.physicalOutputTuple.getByteSize()
- public int totalOutputTupleSize = -1;
-
- public PartitionGroup(SortGroup sortGroup) {
- partitionByExprs = sortGroup.partitionByExprs;
- sortGroups.add(sortGroup);
- totalOutputTupleSize = sortGroup.totalOutputTupleSize;
- }
-
- /**
- * True if the partition exprs of sortGroup are compatible with ours.
- * For now that means equality.
- */
- public boolean isCompatible(SortGroup sortGroup) {
- return Expr.equalSets(sortGroup.partitionByExprs, partitionByExprs);
- }
-
- public void add(SortGroup sortGroup) {
- Preconditions.checkState(isCompatible(sortGroup));
- sortGroups.add(sortGroup);
- totalOutputTupleSize += sortGroup.totalOutputTupleSize;
- }
-
- /**
- * Merge 'other' into 'this'
- * - partitionByExprs is the intersection of the two
- * - sortGroups becomes the union
- */
- public void merge(PartitionGroup other) {
- partitionByExprs = Expr.intersect(partitionByExprs, other.partitionByExprs);
- Preconditions.checkState(Expr.getNumDistinctValues(partitionByExprs) >= 0);
- sortGroups.addAll(other.sortGroups);
- }
-
- /**
- * Order sort groups by increasing totalOutputTupleSize. This minimizes the total
- * volume of data that needs to be sorted.
- */
- public void orderSortGroups() {
- Collections.sort(sortGroups,
- new Comparator<SortGroup>() {
- public int compare(SortGroup sg1, SortGroup sg2) {
- Preconditions.checkState(sg1.totalOutputTupleSize > 0);
- Preconditions.checkState(sg2.totalOutputTupleSize > 0);
- int diff = sg1.totalOutputTupleSize - sg2.totalOutputTupleSize;
- return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
- }
- });
- for (SortGroup sortGroup: sortGroups) {
- sortGroup.orderWindowGroups();
- }
- }
- }
-
- /**
- * Extract a minimal set of PartitionGroups from sortGroups.
- */
- private List<PartitionGroup> collectPartitionGroups(List<SortGroup> sortGroups) {
- List<PartitionGroup> partitionGroups = Lists.newArrayList();
- for (SortGroup sortGroup: sortGroups) {
- boolean match = false;
- for (PartitionGroup partitionGroup: partitionGroups) {
- if (partitionGroup.isCompatible(sortGroup)) {
- partitionGroup.add(sortGroup);
- match = true;
- break;
- }
- }
- if (!match) partitionGroups.add(new PartitionGroup(sortGroup));
- }
- return partitionGroups;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
deleted file mode 100644
index d58e5c4..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class CohortId extends Id<CohortId> {
- // Construction only allowed via an IdGenerator.
- protected CohortId(int id) {
- super(id);
- }
-
- public static IdGenerator<CohortId> createGenerator() {
- return new IdGenerator<CohortId>() {
- @Override
- public CohortId getNextId() { return new CohortId(nextId_++); }
- @Override
- public CohortId getMaxId() { return new CohortId(nextId_ - 1); }
- };
- }
-
- @Override
- public String toString() {
- return String.format("%02d", id_);
- }
-}