You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:40 UTC

[23/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java b/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java
deleted file mode 100644
index 242c704..0000000
--- a/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java
+++ /dev/null
@@ -1,643 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.hive.executor;
-
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-
-import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.catalog.PrimitiveType;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.common.JniUtil;
-import com.cloudera.impala.thrift.THiveUdfExecutorCtorParams;
-import com.cloudera.impala.thrift.TPrimitiveType;
-import com.cloudera.impala.util.UnsafeUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-// Wrapper object to run hive UDFs. This class works with UdfCallExpr in the
-// backend to marshall data back and forth between the execution engine and
-// the java UDF class.
-// See the comments in be/src/exprs/hive-udf-call.h for more details.
-// TODO: should we cache loaded jars and classes?
-@SuppressWarnings("restriction")
-public class UdfExecutor {
-  private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
-  // By convention, the function in the class must be called evaluate()
-  public static final String UDF_FUNCTION_NAME = "evaluate";
-
-  // Object to deserialize ctor params from BE.
-  private final static TBinaryProtocol.Factory PROTOCOL_FACTORY =
-    new TBinaryProtocol.Factory();
-
-  private UDF udf_;
-  private Method method_;
-
-  // Return and argument types of the function inferred from the udf method signature.
-  // The JavaUdfDataType enum maps it to corresponding primitive type.
-  private JavaUdfDataType[] argTypes_;
-  private JavaUdfDataType retType_;
-
-  // Input buffer from the backend. This is valid for the duration of an evaluate() call.
-  // These buffers are allocated in the BE.
-  private final long inputBufferPtr_;
-  private final long inputNullsPtr_;
-
-  // This is the byte offset in inputBufferPtr to the start of the input argument.
-  // e.g. *inputBufferPtr_[inputBufferOffsets[i]] is the ith input argument.
-  private final int[] inputBufferOffsets_;
-
-  // Output buffer to return non-string values. This buffers are allocated in the BE.
-  private final long outputBufferPtr_;
-  private final long outputNullPtr_;
-
-  // For StringValue return types, outputBufferPtr_ is the location of the 16-byte
-  // StringValue object. StringValue.ptr is set to outBufferStringPtr_. This buffer
-  // grows as necessary to fit the return string.
-  // This is allocated from the FE.
-  private long outBufferStringPtr_;
-
-  // Size of outBufferStringPtr_.
-  private int outBufferCapacity_;
-
-  // Preconstructed input objects for the UDF. This minimizes object creation overhead
-  // as these objects are reused across calls to evaluate().
-  private Object[] inputObjects_;
-  private Object[] inputArgs_; // inputArgs_[i] is either inputObjects_[i] or null
-
-  // Data types that are supported as return or argument types in Java UDFs.
-  public enum JavaUdfDataType {
-    INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE),
-    BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN),
-    BOOLEAN_WRITABLE("BOOLEAN_WRITABLE", TPrimitiveType.BOOLEAN),
-    TINYINT("TINYINT", TPrimitiveType.TINYINT),
-    BYTE_WRITABLE("BYTE_WRITABLE", TPrimitiveType.TINYINT),
-    SMALLINT("SMALLINT", TPrimitiveType.SMALLINT),
-    SHORT_WRITABLE("SHORT_WRITABLE", TPrimitiveType.SMALLINT),
-    INT("INT", TPrimitiveType.INT),
-    INT_WRITABLE("INT_WRITABLE", TPrimitiveType.INT),
-    BIGINT("BIGINT", TPrimitiveType.BIGINT),
-    LONG_WRITABLE("LONG_WRITABLE", TPrimitiveType.BIGINT),
-    FLOAT("FLOAT", TPrimitiveType.FLOAT),
-    FLOAT_WRITABLE("FLOAT_WRITABLE", TPrimitiveType.FLOAT),
-    DOUBLE("DOUBLE", TPrimitiveType.DOUBLE),
-    DOUBLE_WRITABLE("DOUBLE", TPrimitiveType.DOUBLE),
-    STRING("STRING", TPrimitiveType.STRING),
-    TEXT("TEXT", TPrimitiveType.STRING),
-    BYTES_WRITABLE("BYTES_WRITABLE", TPrimitiveType.STRING),
-    BYTE_ARRAY("BYTE_ARRAY", TPrimitiveType.STRING);
-
-    private final String description_;
-    private final TPrimitiveType thriftType_;
-
-    private JavaUdfDataType(String description, TPrimitiveType thriftType) {
-      description_ = description;
-      thriftType_ = thriftType;
-    }
-
-    @Override
-    public String toString() { return description_; }
-
-    public TPrimitiveType getPrimitiveType() { return thriftType_; }
-
-    public static JavaUdfDataType getType(Class<?> c) {
-      if (c == BooleanWritable.class) {
-        return JavaUdfDataType.BOOLEAN_WRITABLE;
-      } else if (c == boolean.class || c == Boolean.class) {
-        return JavaUdfDataType.BOOLEAN;
-      } else if (c == ByteWritable.class) {
-        return JavaUdfDataType.BYTE_WRITABLE;
-      } else if (c == byte.class || c == Byte.class) {
-        return JavaUdfDataType.TINYINT;
-      } else if (c == ShortWritable.class) {
-        return JavaUdfDataType.SHORT_WRITABLE;
-      } else if (c == short.class || c == Short.class) {
-        return JavaUdfDataType.SMALLINT;
-      } else if (c == IntWritable.class) {
-        return JavaUdfDataType.INT_WRITABLE;
-      } else if (c == int.class || c == Integer.class) {
-        return JavaUdfDataType.INT;
-      } else if (c == LongWritable.class) {
-        return JavaUdfDataType.LONG_WRITABLE;
-      } else if (c == long.class || c == Long.class) {
-        return JavaUdfDataType.BIGINT;
-      } else if (c == FloatWritable.class) {
-        return JavaUdfDataType.FLOAT_WRITABLE;
-      } else if (c == float.class || c == Float.class) {
-        return JavaUdfDataType.FLOAT;
-      } else if (c == DoubleWritable.class) {
-        return JavaUdfDataType.DOUBLE_WRITABLE;
-      } else if (c == double.class || c == Double.class) {
-        return JavaUdfDataType.DOUBLE;
-      } else if (c == byte[].class) {
-        return JavaUdfDataType.BYTE_ARRAY;
-      } else if (c == BytesWritable.class) {
-        return JavaUdfDataType.BYTES_WRITABLE;
-      } else if (c == Text.class) {
-        return JavaUdfDataType.TEXT;
-      } else if (c == String.class) {
-        return JavaUdfDataType.STRING;
-      }
-      return JavaUdfDataType.INVALID_TYPE;
-    }
-
-    public static boolean isSupported(Type t) {
-      for(JavaUdfDataType javaType: JavaUdfDataType.values()) {
-        if (javaType == JavaUdfDataType.INVALID_TYPE) continue;
-        if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) {
-          return true;
-        }
-      }
-      return false;
-    }
-  }
-
-  /**
-   * Create a UdfExecutor, using parameters from a serialized thrift object. Used by
-   * the backend.
-   */
-  public UdfExecutor(byte[] thriftParams) throws ImpalaException {
-    THiveUdfExecutorCtorParams request = new THiveUdfExecutorCtorParams();
-    JniUtil.deserializeThrift(PROTOCOL_FACTORY, request, thriftParams);
-
-    String className = request.fn.scalar_fn.symbol;
-    String jarFile = request.local_location;
-    Type retType = Type.fromThrift(request.fn.ret_type);
-    Type[] parameterTypes = new Type[request.fn.arg_types.size()];
-    for (int i = 0; i < request.fn.arg_types.size(); ++i) {
-      parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
-    }
-    inputBufferPtr_ = request.input_buffer_ptr;
-    inputNullsPtr_ = request.input_nulls_ptr;
-    outputBufferPtr_ = request.output_buffer_ptr;
-    outputNullPtr_ = request.output_null_ptr;
-    outBufferStringPtr_ = 0;
-    outBufferCapacity_ = 0;
-    inputBufferOffsets_ = new int[request.input_byte_offsets.size()];
-    for (int i = 0; i < request.input_byte_offsets.size(); ++i) {
-      inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue();
-    }
-
-    init(jarFile, className, retType, parameterTypes);
-  }
-
-  @Override
-  protected void finalize() throws Throwable {
-    close();
-    super.finalize();
-  }
-
-  /**
-   * Releases any resources allocated off the native heap.
-   */
-  public void close() {
-    UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_);
-    outBufferStringPtr_ = 0;
-    outBufferCapacity_ = 0;
-  }
-
-  /**
-   * evaluate function called by the backend. The inputs to the UDF have
-   * been serialized to 'input'
-   */
-  public void evaluate() throws ImpalaRuntimeException {
-    try {
-      for (int i = 0; i < argTypes_.length; ++i) {
-        if (UnsafeUtil.UNSAFE.getByte(inputNullsPtr_ + i) == 0) {
-          switch (argTypes_[i]) {
-            case BOOLEAN_WRITABLE:
-            case BYTE_WRITABLE:
-            case SHORT_WRITABLE:
-            case INT_WRITABLE:
-            case LONG_WRITABLE:
-            case FLOAT_WRITABLE:
-            case DOUBLE_WRITABLE:
-            case BYTE_ARRAY:
-            case BYTES_WRITABLE:
-            case TEXT:
-              inputArgs_[i] = inputObjects_[i];
-              break;
-            case BOOLEAN:
-              inputArgs_[i] = ((ImpalaBooleanWritable)inputObjects_[i]).get();
-              break;
-            case TINYINT:
-              inputArgs_[i] = ((ImpalaTinyIntWritable)inputObjects_[i]).get();
-              break;
-            case SMALLINT:
-              inputArgs_[i] = ((ImpalaSmallIntWritable)inputObjects_[i]).get();
-              break;
-            case INT:
-              inputArgs_[i] = ((ImpalaIntWritable)inputObjects_[i]).get();
-              break;
-            case BIGINT:
-              inputArgs_[i] = ((ImpalaBigIntWritable)inputObjects_[i]).get();
-              break;
-            case FLOAT:
-              inputArgs_[i] = ((ImpalaFloatWritable)inputObjects_[i]).get();
-              break;
-            case DOUBLE:
-              inputArgs_[i] = ((ImpalaDoubleWritable)inputObjects_[i]).get();
-              break;
-            case STRING:
-              Preconditions.checkState(inputObjects_[i] instanceof ImpalaBytesWritable);
-              inputArgs_[i] =
-                  new String(((ImpalaBytesWritable)inputObjects_[i]).getBytes());
-              break;
-          }
-        } else {
-          inputArgs_[i] = null;
-        }
-      }
-      evaluate(inputArgs_);
-    } catch (Exception e) {
-      e.printStackTrace(System.err);
-      throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
-    }
-  }
-
-  /**
-   * Evalutes the UDF with 'args' as the input to the UDF. This is exposed
-   * for testing and not the version of evaluate() the backend uses.
-   */
-  public long evaluateForTesting(Object... args) throws ImpalaRuntimeException {
-    try {
-      Object[] inputArgs = new Object[args.length];
-      for (int i = 0; i < args.length; ++i) {
-        switch (argTypes_[i]) {
-          case BOOLEAN_WRITABLE:
-          case BYTE_WRITABLE:
-          case SHORT_WRITABLE:
-          case INT_WRITABLE:
-          case LONG_WRITABLE:
-          case FLOAT_WRITABLE:
-          case DOUBLE_WRITABLE:
-          case TEXT:
-          case BYTE_ARRAY:
-          case BYTES_WRITABLE:
-          case STRING:
-            inputArgs[i] = args[i];
-            break;
-          case BOOLEAN:
-            inputArgs[i] = ((ImpalaBooleanWritable)args[i]).get();
-            break;
-          case TINYINT:
-            inputArgs[i] = ((ImpalaTinyIntWritable)args[i]).get();
-            break;
-          case SMALLINT:
-            inputArgs[i] = ((ImpalaSmallIntWritable)args[i]).get();
-            break;
-          case INT:
-            inputArgs[i] = ((ImpalaIntWritable)args[i]).get();
-            break;
-          case BIGINT:
-            inputArgs[i] = ((ImpalaBigIntWritable)args[i]).get();
-            break;
-          case FLOAT:
-            inputArgs[i] = ((ImpalaFloatWritable)args[i]).get();
-            break;
-          case DOUBLE:
-            inputArgs[i] = ((ImpalaDoubleWritable)args[i]).get();
-            break;
-        }
-      }
-      return evaluate(inputArgs);
-    } catch (Exception e) {
-      e.printStackTrace(System.err);
-      throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e);
-    }
-  }
-
-  /**
-   * Evalutes the UDF with 'args' as the input to the UDF.
-   * Returns 0 if the udf returned NULL. (the result is a ptr so this is okay).
-   */
-  private long evaluate(Object... args) throws ImpalaRuntimeException {
-    try {
-      storeUdfResult(method_.invoke(udf_, args));
-      if (UnsafeUtil.UNSAFE.getByte(outputNullPtr_) == 1) return 0;
-      return outputBufferPtr_;
-    } catch (IllegalArgumentException e) {
-      throw new ImpalaRuntimeException("UDF failed to evaluate", e);
-    } catch (IllegalAccessException e) {
-      throw new ImpalaRuntimeException("UDF failed to evaluate", e);
-    } catch (InvocationTargetException e) {
-      throw new ImpalaRuntimeException("UDF failed to evaluate", e);
-    }
-  }
-
-  public Method getMethod() { return method_; }
-
-  // Sets the result object 'obj' into the outputBufferPtr_
-  private void storeUdfResult(Object obj) throws ImpalaRuntimeException {
-    if (obj == null) {
-      UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1);
-      return;
-    }
-
-    UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0);
-    switch (retType_) {
-      case BOOLEAN_WRITABLE: {
-        BooleanWritable val = (BooleanWritable)obj;
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0);
-        return;
-      }
-      case BOOLEAN: {
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (boolean)obj ? (byte)1 : 0);
-        return;
-      }
-      case BYTE_WRITABLE: {
-        ByteWritable val = (ByteWritable)obj;
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get());
-        return;
-      }
-      case TINYINT: {
-        UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (byte)obj);
-        return;
-      }
-      case SHORT_WRITABLE: {
-        ShortWritable val = (ShortWritable)obj;
-        UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get());
-        return;
-      }
-      case SMALLINT: {
-        UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, (short)obj);
-        return;
-      }
-      case INT_WRITABLE: {
-        IntWritable val = (IntWritable)obj;
-        UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get());
-        return;
-      }
-      case INT: {
-        UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, (int)obj);
-        return;
-      }
-      case LONG_WRITABLE: {
-        LongWritable val = (LongWritable)obj;
-        UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get());
-        return;
-      }
-      case BIGINT: {
-        UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, (long)obj);
-        return;
-      }
-      case FLOAT_WRITABLE: {
-        FloatWritable val = (FloatWritable)obj;
-        UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get());
-        return;
-      }
-      case FLOAT: {
-        UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, (float)obj);
-        return;
-      }
-      case DOUBLE_WRITABLE: {
-        DoubleWritable val = (DoubleWritable)obj;
-        UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get());
-        return;
-      }
-      case DOUBLE: {
-        UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, (double)obj);
-        return;
-      }
-      case TEXT: {
-        copyBytesToOutputBuffer(((Text)obj).copyBytes());
-        return;
-      }
-      case BYTE_ARRAY: {
-        copyBytesToOutputBuffer((byte[]) obj);
-        return;
-      }
-      case BYTES_WRITABLE: {
-        copyBytesToOutputBuffer(((BytesWritable)obj).copyBytes());
-        return;
-      }
-      case STRING: {
-        copyBytesToOutputBuffer(((String)obj).getBytes());
-        return;
-      }
-      default:
-        throw new ImpalaRuntimeException("Unsupported return type: " + retType_);
-    }
-  }
-
-  private void copyBytesToOutputBuffer(byte[] bytes) {
-    if (bytes.length > outBufferCapacity_) {
-      outBufferStringPtr_ =
-          UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length);
-      outBufferCapacity_ = bytes.length;
-      UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_);
-    }
-    UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length);
-    UnsafeUtil.UNSAFE.putInt(
-        outputBufferPtr_ + ImpalaStringWritable.STRING_VALUE_LEN_OFFSET,
-        bytes.length);
-  }
-
-  // Preallocate the input objects that will be passed to the underlying UDF.
-  // These objects are allocated once and reused across calls to evaluate()
-  private void allocateInputObjects() throws ImpalaRuntimeException {
-    inputObjects_ = new Writable[argTypes_.length];
-    inputArgs_ = new Object[argTypes_.length];
-
-    for (int i = 0; i < argTypes_.length; ++i) {
-      int offset = inputBufferOffsets_[i];
-      switch (argTypes_[i]) {
-        case BOOLEAN:
-        case BOOLEAN_WRITABLE:
-          inputObjects_[i] = new ImpalaBooleanWritable(inputBufferPtr_ + offset);
-          break;
-        case TINYINT:
-        case BYTE_WRITABLE:
-          inputObjects_[i] = new ImpalaTinyIntWritable(inputBufferPtr_ + offset);
-          break;
-        case SMALLINT:
-        case SHORT_WRITABLE:
-          inputObjects_[i] = new ImpalaSmallIntWritable(inputBufferPtr_ + offset);
-          break;
-        case INT:
-        case INT_WRITABLE:
-          inputObjects_[i] = new ImpalaIntWritable(inputBufferPtr_ + offset);
-          break;
-        case BIGINT:
-        case LONG_WRITABLE:
-          inputObjects_[i] = new ImpalaBigIntWritable(inputBufferPtr_ + offset);
-          break;
-        case FLOAT:
-        case FLOAT_WRITABLE:
-          inputObjects_[i] = new ImpalaFloatWritable(inputBufferPtr_ + offset);
-          break;
-        case DOUBLE:
-        case DOUBLE_WRITABLE:
-          inputObjects_[i] = new ImpalaDoubleWritable(inputBufferPtr_ + offset);
-          break;
-        case TEXT:
-          inputObjects_[i] = new ImpalaTextWritable(inputBufferPtr_ + offset);
-          break;
-        case BYTES_WRITABLE:
-          inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
-          break;
-        case STRING:
-          // String can be mapped to any String-like Writable class.
-          inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset);
-          break;
-        default:
-          throw new ImpalaRuntimeException("Unsupported argument type: " + argTypes_[i]);
-      }
-    }
-  }
-
-  private ClassLoader getClassLoader(String jarPath) throws MalformedURLException {
-    if (jarPath == null) {
-      return ClassLoader.getSystemClassLoader();
-    } else {
-      URL url = new File(jarPath).toURI().toURL();
-      return URLClassLoader.newInstance(new URL[] { url }, getClass().getClassLoader());
-    }
-  }
-
-  /**
-   * Sets the return type of a Java UDF. Returns true if the return type is compatible
-   * with the return type from the function definition. Throws an ImpalaRuntimeException
-   * if the return type is not supported.
-   */
-  private boolean setReturnType(Type retType, Class<?> udfReturnType)
-      throws ImpalaRuntimeException {
-    if (!JavaUdfDataType.isSupported(retType)) {
-      throw new ImpalaRuntimeException("Unsupported return type: " + retType.toSql());
-    }
-    JavaUdfDataType javaType = JavaUdfDataType.getType(udfReturnType);
-    // Check if the evaluate method return type is compatible with the return type from
-    // the function definition. This happens when both of them map to the same primitive
-    // type.
-    if (retType.getPrimitiveType().toThrift() != javaType.getPrimitiveType()) {
-      return false;
-    }
-    retType_ = javaType;
-    return true;
-  }
-
-  /**
-   * Sets the argument types of a Java UDF. Returns true if the argument types specified
-   * in the UDF are compatible with the argument types of the evaluate() function loaded
-   * from the associated JAR file.
-   */
-  private boolean setArgTypes(Type[] parameterTypes, Class<?>[] udfArgTypes) {
-    Preconditions.checkNotNull(argTypes_);
-    for (int i = 0; i < udfArgTypes.length; ++i) {
-      argTypes_[i] = JavaUdfDataType.getType(udfArgTypes[i]);
-      if (argTypes_[i].getPrimitiveType()
-          != parameterTypes[i].getPrimitiveType().toThrift()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Initializes the UdfExecutor validating the UDF has the proper signature.
-   * This uses reflection to look up the "evaluate" function in the UDF class.
-   */
-  private void init(String jarPath, String udfPath,
-      Type retType, Type... parameterTypes) throws
-      ImpalaRuntimeException {
-    ArrayList<String> signatures = Lists.newArrayList();
-    try {
-      LOG.debug("Loading UDF '" + udfPath + "' from " + jarPath);
-      ClassLoader loader = getClassLoader(jarPath);
-      Class<?> c = Class.forName(udfPath, true, loader);
-      Class<? extends UDF> udfClass = c.asSubclass(UDF.class);
-      Constructor<? extends UDF> ctor = udfClass.getConstructor();
-      udf_ = ctor.newInstance();
-      argTypes_ = new JavaUdfDataType[parameterTypes.length];
-      Method[] methods = udfClass.getMethods();
-      for (Method m: methods) {
-        // By convention, the udf must contain the function "evaluate"
-        if (!m.getName().equals(UDF_FUNCTION_NAME)) continue;
-        signatures.add(m.toGenericString());
-        Class<?>[] methodTypes = m.getParameterTypes();
-
-        // Try to match the arguments
-        if (methodTypes.length != parameterTypes.length) continue;
-        if (methodTypes.length == 0 && parameterTypes.length == 0) {
-          // Special case where the UDF doesn't take any input args
-          method_ = m;
-          if (!setReturnType(retType, m.getReturnType())) continue;
-          setArgTypes(parameterTypes, methodTypes);
-          LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
-          return;
-        }
-
-        method_ = m;
-        if (!setReturnType(retType, m.getReturnType())) continue;
-        if (!setArgTypes(parameterTypes, methodTypes)) continue;
-        allocateInputObjects();
-        LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath);
-        return;
-      }
-
-      StringBuilder sb = new StringBuilder();
-      sb.append("Unable to find evaluate function with the correct signature: ")
-        .append(udfPath + ".evaluate(")
-        .append(Joiner.on(", ").join(parameterTypes))
-        .append(")\n")
-        .append("UDF contains: \n    ")
-        .append(Joiner.on("\n    ").join(signatures));
-      throw new ImpalaRuntimeException(sb.toString());
-    } catch (MalformedURLException e) {
-      throw new ImpalaRuntimeException("Unable load jar.", e);
-    } catch (SecurityException e) {
-      throw new ImpalaRuntimeException("Unable to load function.", e);
-    } catch (ClassNotFoundException e) {
-      throw new ImpalaRuntimeException("Unable to find class.", e);
-    } catch (NoSuchMethodException e) {
-      throw new ImpalaRuntimeException(
-          "Unable to find constructor with no arguments.", e);
-    } catch (IllegalArgumentException e) {
-      throw new ImpalaRuntimeException(
-          "Unable to call UDF constructor with no arguments.", e);
-    } catch (InstantiationException e) {
-      throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
-    } catch (IllegalAccessException e) {
-      throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
-    } catch (InvocationTargetException e) {
-      throw new ImpalaRuntimeException("Unable to call create UDF instance.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
deleted file mode 100644
index f6bf8a0..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AggregateInfo;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.FunctionCallExpr;
-import com.cloudera.impala.analysis.SlotId;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.thrift.TAggregationNode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TExpr;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Aggregation computation.
- *
- */
-public class AggregationNode extends PlanNode {
-  private final static Logger LOG = LoggerFactory.getLogger(AggregationNode.class);
-
-  // Default per-host memory requirement used if no valid stats are available.
-  // TODO: Come up with a more useful heuristic.
-  private final static long DEFAULT_PER_HOST_MEM = 128L * 1024L * 1024L;
-
-  // Conservative minimum size of hash table for low-cardinality aggregations.
-  private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
-
-  private final AggregateInfo aggInfo_;
-
-  // Set to true if this aggregation node needs to run the Finalize step. This
-  // node is the root node of a distributed aggregation.
-  private boolean needsFinalize_;
-
-  // If true, use streaming preaggregation algorithm. Not valid if this is a merge agg.
-  private boolean useStreamingPreagg_;
-
-  /**
-   * Create an agg node from aggInfo.
-   */
-  public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) {
-    super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE");
-    aggInfo_ = aggInfo;
-    children_.add(input);
-    needsFinalize_ = true;
-  }
-
-  /**
-   * Copy c'tor used in clone().
-   */
-  private AggregationNode(PlanNodeId id, AggregationNode src) {
-    super(id, src, "AGGREGATE");
-    aggInfo_ = src.aggInfo_;
-    needsFinalize_ = src.needsFinalize_;
-  }
-
-  public AggregateInfo getAggInfo() { return aggInfo_; }
-
-  /**
-   * Unsets this node as requiring finalize. Only valid to call this if it is
-   * currently marked as needing finalize.
-   */
-  public void unsetNeedsFinalize() {
-    Preconditions.checkState(needsFinalize_);
-    needsFinalize_ = false;
-  }
-
-  /**
-   * Sets this node as a preaggregation. Only valid to call this if it is not marked
-   * as a preaggregation
-   */
-  public void setIsPreagg(PlannerContext ctx_) {
-    TQueryOptions query_options = ctx_.getQueryOptions();
-    useStreamingPreagg_ =  !query_options.disable_streaming_preaggregations &&
-        aggInfo_.getGroupingExprs().size() > 0;
-  }
-
-  /**
-   * Have this node materialize the aggregation's intermediate tuple instead of
-   * the output tuple.
-   */
-  public void setIntermediateTuple() {
-    Preconditions.checkState(!tupleIds_.isEmpty());
-    Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId()));
-    tupleIds_.clear();
-    tupleIds_.add(aggInfo_.getIntermediateTupleId());
-  }
-
-  @Override
-  public boolean isBlockingNode() { return !useStreamingPreagg_; }
-
-  @Override
-  public void init(Analyzer analyzer) throws InternalException {
-    // Assign predicates to the top-most agg in the single-node plan that can evaluate
-    // them, as follows: For non-distinct aggs place them in the 1st phase agg node. For
-    // distinct aggs place them in the 2nd phase agg node. The conjuncts are
-    // transferred to the proper place in the multi-node plan via transferConjuncts().
-    if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && !aggInfo_.isMerge()) {
-      // Ignore predicates bound by a grouping slot produced by a SlotRef grouping expr.
-      // Those predicates are already evaluated below this agg node (e.g., in a scan),
-      // because the grouping slot must be in the same equivalence class as another slot
-      // below this agg node. We must not ignore other grouping slots in order to retain
-      // conjuncts bound by those grouping slots in createEquivConjuncts() (IMPALA-2089).
-      // Those conjuncts cannot be redundant because our equivalence classes do not
-      // capture dependencies with non-SlotRef exprs.
-      Set<SlotId> groupBySlots = Sets.newHashSet();
-      for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) {
-        if (aggInfo_.getGroupingExprs().get(i).unwrapSlotRef(true) == null) continue;
-        groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId());
-      }
-      ArrayList<Expr> bindingPredicates =
-          analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true);
-      conjuncts_.addAll(bindingPredicates);
-
-      // also add remaining unassigned conjuncts_
-      assignConjuncts(analyzer);
-
-      analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, groupBySlots);
-    }
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
-    // Compute the mem layout for both tuples here for simplicity.
-    aggInfo_.getOutputTupleDesc().computeMemLayout();
-    aggInfo_.getIntermediateTupleDesc().computeMemLayout();
-
-    // do this at the end so it can take all conjuncts into account
-    computeStats(analyzer);
-
-    // don't call createDefaultSMap(), it would point our conjuncts (= Having clause)
-    // to our input; our conjuncts don't get substituted because they already
-    // refer to our output
-    outputSmap_ = getCombinedChildSmap();
-    aggInfo_.substitute(outputSmap_, analyzer);
-    // assert consistent aggregate expr and slot materialization
-    aggInfo_.checkConsistency();
-  }
-
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    // This is prone to overflow, because we keep multiplying cardinalities,
-    // even if the grouping exprs are functionally dependent (example:
-    // group by the primary key of a table plus a number of other columns from that
-    // same table)
-    // TODO: try to recognize functional dependencies
-    // TODO: as a shortcut, instead of recognizing functional dependencies,
-    // limit the contribution of a single table to the number of rows
-    // of that table (so that when we're grouping by the primary key col plus
-    // some others, the estimate doesn't overshoot dramatically)
-    // cardinality: product of # of distinct values produced by grouping exprs
-
-    // Any non-grouping aggregation has at least one distinct value
-    cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 :
-      Expr.getNumDistinctValues(aggInfo_.getGroupingExprs());
-    // take HAVING predicate into account
-    LOG.trace("Agg: cardinality=" + Long.toString(cardinality_));
-    if (cardinality_ > 0) {
-      cardinality_ = Math.round((double) cardinality_ * computeSelectivity());
-      LOG.trace("sel=" + Double.toString(computeSelectivity()));
-    }
-    // if we ended up with an overflow, the estimate is certain to be wrong
-    if (cardinality_ < 0) cardinality_ = -1;
-    // Sanity check the cardinality_ based on the input cardinality_.
-    if (getChild(0).getCardinality() != -1) {
-      if (cardinality_ == -1) {
-        // A worst-case cardinality_ is better than an unknown cardinality_.
-        cardinality_ = getChild(0).getCardinality();
-      } else {
-        // An AggregationNode cannot increase the cardinality_.
-        cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_);
-      }
-    }
-    cardinality_ = capAtLimit(cardinality_);
-    LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_));
-  }
-
-  @Override
-  protected String debugString() {
-    return Objects.toStringHelper(this)
-        .add("aggInfo", aggInfo_.debugString())
-        .addValue(super.debugString())
-        .toString();
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.AGGREGATION_NODE;
-
-    List<TExpr> aggregateFunctions = Lists.newArrayList();
-    // only serialize agg exprs that are being materialized
-    for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) {
-      aggregateFunctions.add(e.treeToThrift());
-    }
-    aggInfo_.checkConsistency();
-    msg.agg_node = new TAggregationNode(
-        aggregateFunctions,
-        aggInfo_.getIntermediateTupleId().asInt(),
-        aggInfo_.getOutputTupleId().asInt(), needsFinalize_,
-        useStreamingPreagg_,
-        getChild(0).getCardinality());
-    List<Expr> groupingExprs = aggInfo_.getGroupingExprs();
-    if (groupingExprs != null) {
-      msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
-    }
-  }
-
-  @Override
-  protected String getDisplayLabelDetail() {
-    if (useStreamingPreagg_) return "STREAMING";
-    if (needsFinalize_) return "FINALIZE";
-    return null;
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    String nameDetail = getDisplayLabelDetail();
-    output.append(String.format("%s%s", prefix, getDisplayLabel()));
-    if (nameDetail != null) output.append(" [" + nameDetail + "]");
-    output.append("\n");
-
-    if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-      ArrayList<FunctionCallExpr> aggExprs = aggInfo_.getMaterializedAggregateExprs();
-      if (!aggExprs.isEmpty()) {
-        output.append(detailPrefix + "output: ")
-            .append(getExplainString(aggExprs) + "\n");
-      }
-      // TODO: is this the best way to display this. It currently would
-      // have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not
-      // very obvious what that means if you don't already know.
-
-      // TODO: group by can be very long. Break it into multiple lines
-      if (!aggInfo_.getGroupingExprs().isEmpty()) {
-        output.append(detailPrefix + "group by: ")
-            .append(getExplainString(aggInfo_.getGroupingExprs()) + "\n");
-      }
-      if (!conjuncts_.isEmpty()) {
-        output.append(detailPrefix + "having: ")
-            .append(getExplainString(conjuncts_) + "\n");
-      }
-    }
-    return output.toString();
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    Preconditions.checkNotNull(fragment_,
-        "PlanNode must be placed into a fragment before calling this method.");
-    perHostMemCost_ = 0;
-    long perHostCardinality = fragment_.getNumDistinctValues(aggInfo_.getGroupingExprs());
-    if (perHostCardinality == -1) {
-      perHostMemCost_ = DEFAULT_PER_HOST_MEM;
-      return;
-    }
-
-    // Per-host cardinality cannot be greater than the total output cardinality.
-    if (cardinality_ != -1) {
-      perHostCardinality = Math.min(perHostCardinality, cardinality_);
-    }
-    perHostMemCost_ += Math.max(perHostCardinality * avgRowSize_ *
-        PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
deleted file mode 100644
index ccbdaa2..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
+++ /dev/null
@@ -1,249 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AnalyticWindow;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.OrderByElement;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.thrift.TAnalyticNode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Computation of analytic exprs.
- */
-public class AnalyticEvalNode extends PlanNode {
-  private final static Logger LOG = LoggerFactory.getLogger(AnalyticEvalNode.class);
-
-  private List<Expr> analyticFnCalls_;
-
-  // Partitioning exprs from the AnalyticInfo
-  private final List<Expr> partitionExprs_;
-
-  // TODO: Remove when the BE uses partitionByLessThan rather than the exprs
-  private List<Expr> substitutedPartitionExprs_;
-  private List<OrderByElement> orderByElements_;
-  private final AnalyticWindow analyticWindow_;
-
-  // Physical tuples used/produced by this analytic node.
-  private final TupleDescriptor intermediateTupleDesc_;
-  private final TupleDescriptor outputTupleDesc_;
-
-  // maps from the logical output slots in logicalTupleDesc_ to their corresponding
-  // physical output slots in outputTupleDesc_
-  private final ExprSubstitutionMap logicalToPhysicalSmap_;
-
-  // predicates constructed from partitionExprs_/orderingExprs_ to
-  // compare input to buffered tuples
-  private final Expr partitionByEq_;
-  private final Expr orderByEq_;
-  private final TupleDescriptor bufferedTupleDesc_;
-
-  public AnalyticEvalNode(
-      PlanNodeId id, PlanNode input, List<Expr> analyticFnCalls,
-      List<Expr> partitionExprs, List<OrderByElement> orderByElements,
-      AnalyticWindow analyticWindow, TupleDescriptor intermediateTupleDesc,
-      TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap,
-      Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) {
-    super(id, "ANALYTIC");
-    Preconditions.checkState(!tupleIds_.contains(outputTupleDesc.getId()));
-    analyticFnCalls_ = analyticFnCalls;
-    partitionExprs_ = partitionExprs;
-    orderByElements_ = orderByElements;
-    analyticWindow_ = analyticWindow;
-    intermediateTupleDesc_ = intermediateTupleDesc;
-    outputTupleDesc_ = outputTupleDesc;
-    logicalToPhysicalSmap_ = logicalToPhysicalSmap;
-    partitionByEq_ = partitionByEq;
-    orderByEq_ = orderByEq;
-    bufferedTupleDesc_ = bufferedTupleDesc;
-    children_.add(input);
-    computeTupleIds();
-  }
-
-  @Override
-  public void computeTupleIds() {
-    clearTupleIds();
-    tupleIds_.addAll(getChild(0).getTupleIds());
-    // we're materializing the input row augmented with the analytic output tuple
-    tupleIds_.add(outputTupleDesc_.getId());
-    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
-  }
-
-  @Override
-  public boolean isBlockingNode() { return true; }
-  public List<Expr> getPartitionExprs() { return partitionExprs_; }
-  public List<OrderByElement> getOrderByElements() { return orderByElements_; }
-
-  @Override
-  public void init(Analyzer analyzer) {
-    Preconditions.checkState(conjuncts_.isEmpty());
-    computeMemLayout(analyzer);
-    intermediateTupleDesc_.computeMemLayout();
-
-    // we add the analyticInfo's smap to the combined smap of our child
-    outputSmap_ = logicalToPhysicalSmap_;
-    createDefaultSmap(analyzer);
-
-    // Do not assign any conjuncts here: the conjuncts out of our SelectStmt's
-    // Where clause have already been assigned, and conjuncts coming out of an
-    // enclosing scope need to be evaluated *after* all analytic computations.
-
-    // do this at the end so it can take all conjuncts into account
-    computeStats(analyzer);
-
-    LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
-
-    // point fn calls, partition and ordering exprs at our input
-    ExprSubstitutionMap childSmap = getCombinedChildSmap();
-    analyticFnCalls_ = Expr.substituteList(analyticFnCalls_, childSmap, analyzer, false);
-    substitutedPartitionExprs_ = Expr.substituteList(partitionExprs_, childSmap,
-        analyzer, false);
-    orderByElements_ = OrderByElement.substitute(orderByElements_, childSmap, analyzer);
-    LOG.trace("evalnode: " + debugString());
-  }
-
-  @Override
-  protected void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    cardinality_ = getChild(0).cardinality_;
-    cardinality_ = capAtLimit(cardinality_);
-  }
-
-  @Override
-  protected String debugString() {
-    List<String> orderByElementStrs = Lists.newArrayList();
-    for (OrderByElement element: orderByElements_) {
-      orderByElementStrs.add(element.toSql());
-    }
-    return Objects.toStringHelper(this)
-        .add("analyticFnCalls", Expr.debugString(analyticFnCalls_))
-        .add("partitionExprs", Expr.debugString(partitionExprs_))
-        .add("subtitutedPartitionExprs", Expr.debugString(substitutedPartitionExprs_))
-        .add("orderByElements", Joiner.on(", ").join(orderByElementStrs))
-        .add("window", analyticWindow_)
-        .add("intermediateTid", intermediateTupleDesc_.getId())
-        .add("outputTid", outputTupleDesc_.getId())
-        .add("partitionByEq",
-            partitionByEq_ != null ? partitionByEq_.debugString() : "null")
-        .add("orderByEq",
-            orderByEq_ != null ? orderByEq_.debugString() : "null")
-        .addValue(super.debugString())
-        .toString();
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.ANALYTIC_EVAL_NODE;
-    msg.analytic_node = new TAnalyticNode();
-    msg.analytic_node.setIntermediate_tuple_id(intermediateTupleDesc_.getId().asInt());
-    msg.analytic_node.setOutput_tuple_id(outputTupleDesc_.getId().asInt());
-    msg.analytic_node.setPartition_exprs(Expr.treesToThrift(substitutedPartitionExprs_));
-    msg.analytic_node.setOrder_by_exprs(
-        Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements_)));
-    msg.analytic_node.setAnalytic_functions(Expr.treesToThrift(analyticFnCalls_));
-    if (analyticWindow_ == null) {
-      if (!orderByElements_.isEmpty()) {
-        msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift());
-      }
-    } else {
-      // TODO: Window boundaries should have range_offset_predicate set
-      msg.analytic_node.setWindow(analyticWindow_.toThrift());
-    }
-    if (partitionByEq_ != null) {
-      msg.analytic_node.setPartition_by_eq(partitionByEq_.treeToThrift());
-    }
-    if (orderByEq_ != null) {
-      msg.analytic_node.setOrder_by_eq(orderByEq_.treeToThrift());
-    }
-    if (bufferedTupleDesc_ != null) {
-      msg.analytic_node.setBuffered_tuple_id(bufferedTupleDesc_.getId().asInt());
-    }
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(String.format("%s%s", prefix, getDisplayLabel()));
-    output.append("\n");
-    if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-      output.append(detailPrefix + "functions: ");
-      List<String> strings = Lists.newArrayList();
-      for (Expr fnCall: analyticFnCalls_) {
-        strings.add(fnCall.toSql());
-      }
-      output.append(Joiner.on(", ").join(strings));
-      output.append("\n");
-
-      if (!partitionExprs_.isEmpty()) {
-        output.append(detailPrefix + "partition by: ");
-        strings.clear();
-        for (Expr partitionExpr: partitionExprs_) {
-          strings.add(partitionExpr.toSql());
-        }
-        output.append(Joiner.on(", ").join(strings));
-        output.append("\n");
-      }
-
-      if (!orderByElements_.isEmpty()) {
-        output.append(detailPrefix + "order by: ");
-        strings.clear();
-        for (OrderByElement element: orderByElements_) {
-          strings.add(element.toSql());
-        }
-        output.append(Joiner.on(", ").join(strings));
-        output.append("\n");
-      }
-
-      if (analyticWindow_ != null) {
-        output.append(detailPrefix + "window: ");
-        output.append(analyticWindow_.toSql());
-        output.append("\n");
-      }
-
-      if (!conjuncts_.isEmpty()) {
-        output.append(
-            detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
-      }
-    }
-    return output.toString();
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    Preconditions.checkNotNull(fragment_,
-        "PlanNode must be placed into a fragment before calling this method.");
-    // TODO: come up with estimate based on window
-    perHostMemCost_ = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
deleted file mode 100644
index c02096e..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
+++ /dev/null
@@ -1,815 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AggregateInfoBase;
-import com.cloudera.impala.analysis.AnalyticExpr;
-import com.cloudera.impala.analysis.AnalyticInfo;
-import com.cloudera.impala.analysis.AnalyticWindow;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.BoolLiteral;
-import com.cloudera.impala.analysis.CompoundPredicate;
-import com.cloudera.impala.analysis.CompoundPredicate.Operator;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.IsNullPredicate;
-import com.cloudera.impala.analysis.OrderByElement;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.analysis.SortInfo;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.analysis.TupleId;
-import com.cloudera.impala.analysis.TupleIsNullPredicate;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.thrift.TPartitionType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-
-/**
- * The analytic planner adds plan nodes to an existing plan tree in order to
- * implement the AnalyticInfo of a given query stmt. The resulting plan reflects
- * similarities among analytic exprs with respect to partitioning, ordering and
- * windowing to reduce data exchanges and sorts (the exchanges and sorts are currently
- * not minimal). The generated plan has the following structure:
- * ...
- * (
- *  (
- *    (
- *      analytic node  <-- group of analytic exprs with compatible window
- *    )+               <-- group of analytic exprs with compatible ordering
- *    sort node?
- *  )+                 <-- group of analytic exprs with compatible partitioning
- *  hash exchange?
- * )*                  <-- group of analytic exprs that have different partitioning
- * input plan node
- * ...
- */
-public class AnalyticPlanner {
-  private final static Logger LOG = LoggerFactory.getLogger(AnalyticPlanner.class);
-
-  private final AnalyticInfo analyticInfo_;
-  private final Analyzer analyzer_;
-  private final PlannerContext ctx_;
-
-  public AnalyticPlanner(AnalyticInfo analyticInfo, Analyzer analyzer,
-      PlannerContext ctx) {
-    analyticInfo_ = analyticInfo;
-    analyzer_ = analyzer;
-    ctx_ = ctx;
-  }
-
-  /**
-   * Return plan tree that augments 'root' with plan nodes that implement single-node
-   * evaluation of the AnalyticExprs in analyticInfo.
-   * This plan takes into account a possible hash partition of its input on
-   * 'groupingExprs'; if this is non-null, it returns in 'inputPartitionExprs'
-   * a subset of the grouping exprs which should be used for the aggregate
-   * hash partitioning during the parallelization of 'root'.
-   * TODO: when generating sort orders for the sort groups, optimize the ordering
-   * of the partition exprs (so that subsequent sort operations see the input sorted
-   * on a prefix of their required sort exprs)
-   * TODO: when merging sort groups, recognize equivalent exprs
-   * (using the equivalence classes) rather than looking for expr equality
-   */
-  public PlanNode createSingleNodePlan(PlanNode root,
-      List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException {
-    List<WindowGroup> windowGroups = collectWindowGroups();
-    for (int i = 0; i < windowGroups.size(); ++i) {
-      windowGroups.get(i).init(analyzer_, "wg-" + i);
-    }
-    List<SortGroup> sortGroups = collectSortGroups(windowGroups);
-    mergeSortGroups(sortGroups);
-    for (SortGroup g: sortGroups) {
-      g.init();
-    }
-    List<PartitionGroup> partitionGroups = collectPartitionGroups(sortGroups);
-    mergePartitionGroups(partitionGroups, root.getNumNodes());
-    orderGroups(partitionGroups);
-    if (groupingExprs != null) {
-      Preconditions.checkNotNull(inputPartitionExprs);
-      computeInputPartitionExprs(
-          partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs);
-    }
-
-    for (PartitionGroup partitionGroup: partitionGroups) {
-      for (int i = 0; i < partitionGroup.sortGroups.size(); ++i) {
-        root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i),
-            i == 0 ? partitionGroup.partitionByExprs : null);
-      }
-    }
-
-    // create equiv classes for newly added slots
-    analyzer_.createIdentityEquivClasses();
-    return root;
-  }
-
-  /**
-   * Coalesce sort groups that have compatible partition-by exprs and
-   * have a prefix relationship.
-   */
-  private void mergeSortGroups(List<SortGroup> sortGroups) {
-    boolean hasMerged = false;
-    do {
-      hasMerged = false;
-      for (SortGroup sg1: sortGroups) {
-        for (SortGroup sg2: sortGroups) {
-          if (sg1 != sg2 && sg1.isPrefixOf(sg2)) {
-            sg1.absorb(sg2);
-            sortGroups.remove(sg2);
-            hasMerged = true;
-            break;
-          }
-        }
-        if (hasMerged) break;
-      }
-    } while (hasMerged);
-  }
-
-  /**
-   * Coalesce partition groups for which the intersection of their
-   * partition exprs has ndv estimate > numNodes, so that the resulting plan
-   * still parallelizes across all nodes.
-   */
-  private void mergePartitionGroups(
-      List<PartitionGroup> partitionGroups, int numNodes) {
-    boolean hasMerged = false;
-    do {
-      hasMerged = false;
-      for (PartitionGroup pg1: partitionGroups) {
-        for (PartitionGroup pg2: partitionGroups) {
-          if (pg1 != pg2) {
-            long ndv = Expr.getNumDistinctValues(
-                Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
-            if (ndv == -1 || ndv < 0 || ndv < numNodes) {
-              // didn't get a usable value or the number of partitions is too small
-              continue;
-            }
-            pg1.merge(pg2);
-            partitionGroups.remove(pg2);
-            hasMerged = true;
-            break;
-          }
-        }
-        if (hasMerged) break;
-      }
-    } while (hasMerged);
-  }
-
-  /**
-   * Determine the partition group that has the maximum intersection in terms
-   * of the estimated ndv of the partition exprs with groupingExprs.
-   * That partition group is placed at the front of partitionGroups, with its
-   * partition exprs reduced to the intersection, and the intersecting groupingExprs
-   * are returned in inputPartitionExprs.
-   */
-  private void computeInputPartitionExprs(List<PartitionGroup> partitionGroups,
-      List<Expr> groupingExprs, int numNodes, List<Expr> inputPartitionExprs) {
-    inputPartitionExprs.clear();
-    Preconditions.checkState(numNodes != -1);
-    // find partition group with maximum intersection
-    long maxNdv = 0;
-    PartitionGroup maxPg = null;
-    List<Expr> maxGroupingExprs = null;
-    for (PartitionGroup pg: partitionGroups) {
-      List<Expr> l1 = Lists.newArrayList();
-      List<Expr> l2 = Lists.newArrayList();
-      Expr.intersect(analyzer_, pg.partitionByExprs, groupingExprs,
-          analyzer_.getEquivClassSmap(), l1, l2);
-      // TODO: also look at l2 and take the max?
-      long ndv = Expr.getNumDistinctValues(l1);
-      if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
-      // found a better partition group
-      maxPg = pg;
-      maxPg.partitionByExprs = l1;
-      maxGroupingExprs = l2;
-      maxNdv = ndv;
-    }
-
-    if (maxNdv > numNodes) {
-      Preconditions.checkNotNull(maxPg);
-      // we found a partition group that gives us enough parallelism;
-      // move it to the front
-      partitionGroups.remove(maxPg);
-      partitionGroups.add(0, maxPg);
-      inputPartitionExprs.addAll(maxGroupingExprs);
-    }
-  }
-
-  /**
-   * Order partition groups (and the sort groups within them) by increasing
-   * totalOutputTupleSize. This minimizes the total volume of data that needs to be
-   * repartitioned and sorted.
-   * Also move the non-partitioning partition group to the end.
-   */
-  private void orderGroups(List<PartitionGroup> partitionGroups) {
-    // remove the non-partitioning group from partitionGroups
-    PartitionGroup nonPartitioning = null;
-    for (PartitionGroup pg: partitionGroups) {
-      if (pg.partitionByExprs.isEmpty()) {
-        nonPartitioning = pg;
-        break;
-      }
-    }
-    if (nonPartitioning != null) partitionGroups.remove(nonPartitioning);
-
-    // order by ascending combined output tuple size
-    Collections.sort(partitionGroups,
-        new Comparator<PartitionGroup>() {
-          public int compare(PartitionGroup pg1, PartitionGroup pg2) {
-            Preconditions.checkState(pg1.totalOutputTupleSize > 0);
-            Preconditions.checkState(pg2.totalOutputTupleSize > 0);
-            int diff = pg1.totalOutputTupleSize - pg2.totalOutputTupleSize;
-            return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
-          }
-        });
-    if (nonPartitioning != null) partitionGroups.add(nonPartitioning);
-
-    for (PartitionGroup pg: partitionGroups) {
-      pg.orderSortGroups();
-    }
-  }
-
-  /**
-   * Create SortInfo, including sort tuple, to sort entire input row
-   * on sortExprs.
-   */
-  private SortInfo createSortInfo(
-      PlanNode input, List<Expr> sortExprs, List<Boolean> isAsc,
-      List<Boolean> nullsFirst) {
-    // create tuple for sort output = the entire materialized input in a single tuple
-    TupleDescriptor sortTupleDesc =
-        analyzer_.getDescTbl().createTupleDescriptor("sort-tuple");
-    ExprSubstitutionMap sortSmap = new ExprSubstitutionMap();
-    List<Expr> sortSlotExprs = Lists.newArrayList();
-    sortTupleDesc.setIsMaterialized(true);
-    for (TupleId tid: input.getTupleIds()) {
-      TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid);
-      for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) {
-        if (!inputSlotDesc.isMaterialized()) continue;
-        SlotDescriptor sortSlotDesc =
-            analyzer_.copySlotDescriptor(inputSlotDesc, sortTupleDesc);
-        // all output slots need to be materialized
-        sortSlotDesc.setIsMaterialized(true);
-        sortSmap.put(new SlotRef(inputSlotDesc), new SlotRef(sortSlotDesc));
-        sortSlotExprs.add(new SlotRef(inputSlotDesc));
-      }
-    }
-
-    // Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains
-    // TupleIsNullPredicates. TupleIsNullPredicates require specific tuple ids for
-    // evaluation. Since this sort materializes a new tuple, it's impossible to evaluate
-    // TupleIsNullPredicates referring to this sort's input after this sort,
-    // To preserve the information whether an input tuple was null or not this sort node,
-    // we materialize those rhs TupleIsNullPredicates, which are then substituted
-    // by a SlotRef into the sort's tuple in ancestor nodes (IMPALA-1519).
-    ExprSubstitutionMap inputSmap = input.getOutputSmap();
-    if (inputSmap != null) {
-      List<Expr> tupleIsNullPredsToMaterialize = Lists.newArrayList();
-      for (int i = 0; i < inputSmap.size(); ++i) {
-        Expr rhsExpr = inputSmap.getRhs().get(i);
-        // Ignore substitutions that are irrelevant at this plan node and its ancestors.
-        if (!rhsExpr.isBoundByTupleIds(input.getTupleIds())) continue;
-        rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPredsToMaterialize);
-      }
-      Expr.removeDuplicates(tupleIsNullPredsToMaterialize);
-
-      // Materialize relevant unique TupleIsNullPredicates.
-      for (Expr tupleIsNullPred: tupleIsNullPredsToMaterialize) {
-        SlotDescriptor sortSlotDesc = analyzer_.addSlotDescriptor(sortTupleDesc);
-        sortSlotDesc.setType(tupleIsNullPred.getType());
-        sortSlotDesc.setIsMaterialized(true);
-        sortSlotDesc.setSourceExpr(tupleIsNullPred);
-        sortSlotDesc.setLabel(tupleIsNullPred.toSql());
-        sortSlotExprs.add(tupleIsNullPred.clone());
-      }
-    }
-
-    SortInfo sortInfo = new SortInfo(
-        Expr.substituteList(sortExprs, sortSmap, analyzer_, false), isAsc, nullsFirst);
-    LOG.trace("sortinfo exprs: " + Expr.debugString(sortInfo.getOrderingExprs()));
-    sortInfo.setMaterializedTupleInfo(sortTupleDesc, sortSlotExprs);
-    return sortInfo;
-  }
-
-  /**
-   * Create plan tree for the entire sort group, including all contained window groups.
-   * Marks the SortNode as requiring its input to be partitioned if partitionExprs
-   * is not null (partitionExprs represent the data partition of the entire partition
-   * group of which this sort group is a part).
-   */
-  private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup,
-      List<Expr> partitionExprs) throws ImpalaException {
-    List<Expr> partitionByExprs = sortGroup.partitionByExprs;
-    List<OrderByElement> orderByElements = sortGroup.orderByElements;
-    ExprSubstitutionMap sortSmap = null;
-    TupleId sortTupleId = null;
-    TupleDescriptor bufferedTupleDesc = null;
-    // map from input to buffered tuple
-    ExprSubstitutionMap bufferedSmap = new ExprSubstitutionMap();
-
-    // sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates
-    if (!partitionByExprs.isEmpty() || !orderByElements.isEmpty()) {
-      // first sort on partitionExprs (direction doesn't matter)
-      List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
-      List<Boolean> isAsc =
-          Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
-      // TODO: utilize a direction and nulls/first last that has benefit
-      // for subsequent sort groups
-      List<Boolean> nullsFirst =
-          Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
-
-      // then sort on orderByExprs
-      for (OrderByElement orderByElement: sortGroup.orderByElements) {
-        sortExprs.add(orderByElement.getExpr());
-        isAsc.add(orderByElement.isAsc());
-        nullsFirst.add(orderByElement.getNullsFirstParam());
-      }
-
-      SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
-      SortNode sortNode = new SortNode(ctx_.getNextNodeId(), root, sortInfo, false, 0);
-
-      // if this sort group does not have partitioning exprs, we want the sort
-      // to be executed like a regular distributed sort
-      if (!partitionByExprs.isEmpty()) sortNode.setIsAnalyticSort(true);
-
-      if (partitionExprs != null) {
-        // create required input partition
-        DataPartition inputPartition = DataPartition.UNPARTITIONED;
-        if (!partitionExprs.isEmpty()) {
-          inputPartition = DataPartition.hashPartitioned(partitionExprs);
-        }
-        sortNode.setInputPartition(inputPartition);
-      }
-
-      root = sortNode;
-      root.init(analyzer_);
-      sortSmap = sortNode.getOutputSmap();
-
-      // create bufferedTupleDesc and bufferedSmap
-      sortTupleId = sortNode.tupleIds_.get(0);
-      bufferedTupleDesc =
-          analyzer_.getDescTbl().copyTupleDescriptor(sortTupleId, "buffered-tuple");
-      LOG.trace("desctbl: " + analyzer_.getDescTbl().debugString());
-
-      List<SlotDescriptor> inputSlots = analyzer_.getTupleDesc(sortTupleId).getSlots();
-      List<SlotDescriptor> bufferedSlots = bufferedTupleDesc.getSlots();
-      for (int i = 0; i < inputSlots.size(); ++i) {
-        bufferedSmap.put(
-            new SlotRef(inputSlots.get(i)), new SlotRef(bufferedSlots.get(i)));
-      }
-    }
-
-    // create one AnalyticEvalNode per window group
-    for (WindowGroup windowGroup: sortGroup.windowGroups) {
-      // Create partition-by (pb) and order-by (ob) less-than predicates between the
-      // input tuple (the output of the preceding sort) and a buffered tuple that is
-      // identical to the input tuple. We need a different tuple descriptor for the
-      // buffered tuple because the generated predicates should compare two different
-      // tuple instances from the same input stream (i.e., the predicates should be
-      // evaluated over a row that is composed of the input and the buffered tuple).
-
-      // we need to remap the pb/ob exprs to a) the sort output, b) our buffer of the
-      // sort input
-      Expr partitionByEq = null;
-      if (!windowGroup.partitionByExprs.isEmpty()) {
-        partitionByEq = createNullMatchingEquals(
-            Expr.substituteList(windowGroup.partitionByExprs, sortSmap, analyzer_, false),
-            sortTupleId, bufferedSmap);
-        LOG.trace("partitionByEq: " + partitionByEq.debugString());
-      }
-      Expr orderByEq = null;
-      if (!windowGroup.orderByElements.isEmpty()) {
-        orderByEq = createNullMatchingEquals(
-            OrderByElement.getOrderByExprs(OrderByElement.substitute(
-                windowGroup.orderByElements, sortSmap, analyzer_)),
-            sortTupleId, bufferedSmap);
-        LOG.trace("orderByEq: " + orderByEq.debugString());
-      }
-
-      root = new AnalyticEvalNode(ctx_.getNextNodeId(), root,
-          windowGroup.analyticFnCalls, windowGroup.partitionByExprs,
-          windowGroup.orderByElements, windowGroup.window,
-          windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple,
-          windowGroup.logicalToPhysicalSmap,
-          partitionByEq, orderByEq, bufferedTupleDesc);
-      root.init(analyzer_);
-    }
-    return root;
-  }
-
-  /**
-   * Create a predicate that checks if all exprs are equal or both sides are null.
-   */
-  private Expr createNullMatchingEquals(List<Expr> exprs, TupleId inputTid,
-      ExprSubstitutionMap bufferedSmap) {
-    Preconditions.checkState(!exprs.isEmpty());
-    Expr result = createNullMatchingEqualsAux(exprs, 0, inputTid, bufferedSmap);
-    result.analyzeNoThrow(analyzer_);
-    return result;
-  }
-
-  /**
-   * Create an unanalyzed predicate that checks if elements >= i are equal or
-   * both sides are null.
-   *
-   * The predicate has the form
-   * ((lhs[i] is null && rhs[i] is null) || (
-   *   lhs[i] is not null && rhs[i] is not null && lhs[i] = rhs[i]))
-   * && <createEqualsAux(i + 1)>
-   */
-  private Expr createNullMatchingEqualsAux(List<Expr> elements, int i,
-      TupleId inputTid, ExprSubstitutionMap bufferedSmap) {
-    if (i > elements.size() - 1) return new BoolLiteral(true);
-
-    // compare elements[i]
-    Expr lhs = elements.get(i);
-    Preconditions.checkState(lhs.isBound(inputTid));
-    Expr rhs = lhs.substitute(bufferedSmap, analyzer_, false);
-
-    Expr bothNull = new CompoundPredicate(Operator.AND,
-        new IsNullPredicate(lhs, false), new IsNullPredicate(rhs, false));
-    Expr lhsEqRhsNotNull = new CompoundPredicate(Operator.AND,
-        new CompoundPredicate(Operator.AND,
-            new IsNullPredicate(lhs, true), new IsNullPredicate(rhs, true)),
-        new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs));
-    Expr remainder = createNullMatchingEqualsAux(elements, i + 1, inputTid, bufferedSmap);
-    return new CompoundPredicate(CompoundPredicate.Operator.AND,
-        new CompoundPredicate(Operator.OR, bothNull, lhsEqRhsNotNull),
-        remainder);
-  }
-
-  /**
-   * Collection of AnalyticExprs that share the same partition-by/order-by/window
-   * specification. The AnalyticExprs are stored broken up into their constituent parts.
-   */
-  private static class WindowGroup {
-    public final List<Expr> partitionByExprs;
-    public final List<OrderByElement> orderByElements;
-    public final AnalyticWindow window; // not null
-
-    // Analytic exprs belonging to this window group and their corresponding logical
-    // intermediate and output slots from AnalyticInfo.intermediateTupleDesc_
-    // and AnalyticInfo.outputTupleDesc_.
-    public final List<AnalyticExpr> analyticExprs = Lists.newArrayList();
-    // Result of getFnCall() for every analytic expr.
-    public final List<Expr> analyticFnCalls = Lists.newArrayList();
-    public final List<SlotDescriptor> logicalOutputSlots = Lists.newArrayList();
-    public final List<SlotDescriptor> logicalIntermediateSlots = Lists.newArrayList();
-
-    // Physical output and intermediate tuples as well as an smap that maps the
-    // corresponding logical output slots to their physical slots in physicalOutputTuple.
-    // Set in init().
-    public TupleDescriptor physicalOutputTuple;
-    public TupleDescriptor physicalIntermediateTuple;
-    public final ExprSubstitutionMap logicalToPhysicalSmap = new ExprSubstitutionMap();
-
-    public WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
-        SlotDescriptor logicalIntermediateSlot) {
-      partitionByExprs = analyticExpr.getPartitionExprs();
-      orderByElements = analyticExpr.getOrderByElements();
-      window = analyticExpr.getWindow();
-      analyticExprs.add(analyticExpr);
-      analyticFnCalls.add(analyticExpr.getFnCall());
-      logicalOutputSlots.add(logicalOutputSlot);
-      logicalIntermediateSlots.add(logicalIntermediateSlot);
-    }
-
-    /**
-     * True if this analytic function must be evaluated in its own WindowGroup.
-     */
-    private static boolean requiresIndependentEval(AnalyticExpr analyticExpr) {
-      return analyticExpr.getFnCall().getFnName().getFunction().equals(
-          AnalyticExpr.FIRST_VALUE_REWRITE);
-    }
-
-    /**
-     * True if the partition exprs and ordering elements and the window of analyticExpr
-     * match ours.
-     */
-    public boolean isCompatible(AnalyticExpr analyticExpr) {
-      if (requiresIndependentEval(analyticExprs.get(0)) ||
-          requiresIndependentEval(analyticExpr)) {
-        return false;
-      }
-
-      if (!Expr.equalSets(analyticExpr.getPartitionExprs(), partitionByExprs)) {
-        return false;
-      }
-      if (!analyticExpr.getOrderByElements().equals(orderByElements)) return false;
-      if ((window == null) != (analyticExpr.getWindow() == null)) return false;
-      if (window == null) return true;
-      return analyticExpr.getWindow().equals(window);
-    }
-
-    /**
-     * Adds the given analytic expr and its logical slots to this window group.
-     * Assumes the corresponding analyticExpr is compatible with 'this'.
-     */
-    public void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
-        SlotDescriptor logicalIntermediateSlot) {
-      Preconditions.checkState(isCompatible(analyticExpr));
-      analyticExprs.add(analyticExpr);
-      analyticFnCalls.add(analyticExpr.getFnCall());
-      logicalOutputSlots.add(logicalOutputSlot);
-      logicalIntermediateSlots.add(logicalIntermediateSlot);
-    }
-
-    /**
-     * Creates the physical output and intermediate tuples as well as the logical to
-     * physical smap for this window group. Computes the mem layout for the tuple
-     * descriptors.
-     */
-    public void init(Analyzer analyzer, String tupleName) {
-      Preconditions.checkState(physicalOutputTuple == null);
-      Preconditions.checkState(physicalIntermediateTuple == null);
-      Preconditions.checkState(analyticFnCalls.size() == analyticExprs.size());
-
-      // If needed, create the intermediate tuple first to maintain
-      // intermediateTupleId < outputTupleId for debugging purposes and consistency with
-      // tuple creation for aggregations.
-      boolean requiresIntermediateTuple =
-          AggregateInfoBase.requiresIntermediateTuple(analyticFnCalls);
-      if (requiresIntermediateTuple) {
-        physicalIntermediateTuple =
-            analyzer.getDescTbl().createTupleDescriptor(tupleName + "intermed");
-        physicalOutputTuple =
-            analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
-      } else {
-        physicalOutputTuple =
-            analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
-        physicalIntermediateTuple = physicalOutputTuple;
-      }
-
-      Preconditions.checkState(analyticExprs.size() == logicalIntermediateSlots.size());
-      Preconditions.checkState(analyticExprs.size() == logicalOutputSlots.size());
-      for (int i = 0; i < analyticExprs.size(); ++i) {
-        SlotDescriptor logicalOutputSlot = logicalOutputSlots.get(i);
-        SlotDescriptor physicalOutputSlot =
-            analyzer.copySlotDescriptor(logicalOutputSlot, physicalOutputTuple);
-        physicalOutputSlot.setIsMaterialized(true);
-        if (requiresIntermediateTuple) {
-          SlotDescriptor logicalIntermediateSlot = logicalIntermediateSlots.get(i);
-          SlotDescriptor physicalIntermediateSlot = analyzer.copySlotDescriptor(
-              logicalIntermediateSlot, physicalIntermediateTuple);
-          physicalIntermediateSlot.setIsMaterialized(true);
-        }
-        logicalToPhysicalSmap.put(
-            new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot));
-      }
-      physicalOutputTuple.computeMemLayout();
-      if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
-    }
-  }
-
-  /**
-   * Extract a minimal set of WindowGroups from analyticExprs.
-   */
-  private List<WindowGroup> collectWindowGroups() {
-    List<Expr> analyticExprs = analyticInfo_.getAnalyticExprs();
-    List<WindowGroup> groups = Lists.newArrayList();
-    for (int i = 0; i < analyticExprs.size(); ++i) {
-      AnalyticExpr analyticExpr = (AnalyticExpr) analyticExprs.get(i);
-      // Do not generate the plan for non-materialized analytic exprs.
-      if (!analyticInfo_.getOutputTupleDesc().getSlots().get(i).isMaterialized()) {
-        continue;
-      }
-      boolean match = false;
-      for (WindowGroup group: groups) {
-        if (group.isCompatible(analyticExpr)) {
-          group.add((AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
-              analyticInfo_.getOutputTupleDesc().getSlots().get(i),
-              analyticInfo_.getIntermediateTupleDesc().getSlots().get(i));
-          match = true;
-          break;
-        }
-      }
-      if (!match) {
-        groups.add(new WindowGroup(
-            (AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
-            analyticInfo_.getOutputTupleDesc().getSlots().get(i),
-            analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)));
-      }
-    }
-    return groups;
-  }
-
-  /**
-   * Collection of WindowGroups that share the same partition-by/order-by
-   * specification.
-   */
-  private static class SortGroup {
-    public List<Expr> partitionByExprs;
-    public List<OrderByElement> orderByElements;
-    public List<WindowGroup> windowGroups = Lists.newArrayList();
-
-    // sum of windowGroups.physicalOutputTuple.getByteSize()
-    public int totalOutputTupleSize = -1;
-
-    public SortGroup(WindowGroup windowGroup) {
-      partitionByExprs = windowGroup.partitionByExprs;
-      orderByElements = windowGroup.orderByElements;
-      windowGroups.add(windowGroup);
-    }
-
-    /**
-     * True if the partition and ordering exprs of windowGroup match ours.
-     */
-    public boolean isCompatible(WindowGroup windowGroup) {
-      return Expr.equalSets(windowGroup.partitionByExprs, partitionByExprs)
-          && windowGroup.orderByElements.equals(orderByElements);
-    }
-
-    public void add(WindowGroup windowGroup) {
-      Preconditions.checkState(isCompatible(windowGroup));
-      windowGroups.add(windowGroup);
-    }
-
-    /**
-     * Return true if 'this' and other have compatible partition exprs and
-     * our orderByElements are a prefix of other's.
-     */
-    public boolean isPrefixOf(SortGroup other) {
-      if (other.orderByElements.size() > orderByElements.size()) return false;
-      if (!Expr.equalSets(partitionByExprs, other.partitionByExprs)) return false;
-      for (int i = 0; i < other.orderByElements.size(); ++i) {
-        OrderByElement ob = orderByElements.get(i);
-        OrderByElement otherOb = other.orderByElements.get(i);
-        // TODO: compare equiv classes by comparing each equiv class's placeholder
-        // slotref
-        if (!ob.getExpr().equals(otherOb.getExpr())) return false;
-        if (ob.isAsc() != otherOb.isAsc()) return false;
-        if (ob.nullsFirst() != otherOb.nullsFirst()) return false;
-      }
-      return true;
-    }
-
-    /**
-     * Adds other's window groups to ours, assuming that we're a prefix of other.
-     */
-    public void absorb(SortGroup other) {
-      Preconditions.checkState(isPrefixOf(other));
-      windowGroups.addAll(other.windowGroups);
-    }
-
-    /**
-     * Compute totalOutputTupleSize.
-     */
-    public void init() {
-      totalOutputTupleSize = 0;
-      for (WindowGroup g: windowGroups) {
-        TupleDescriptor outputTuple = g.physicalOutputTuple;
-        Preconditions.checkState(outputTuple.isMaterialized());
-        Preconditions.checkState(outputTuple.getByteSize() != -1);
-        totalOutputTupleSize += outputTuple.getByteSize();
-      }
-    }
-
-    private static class SizeLt implements Comparator<WindowGroup> {
-      public int compare(WindowGroup wg1, WindowGroup wg2) {
-        Preconditions.checkState(wg1.physicalOutputTuple != null
-            && wg1.physicalOutputTuple.getByteSize() != -1);
-        Preconditions.checkState(wg2.physicalOutputTuple != null
-            && wg2.physicalOutputTuple.getByteSize() != -1);
-        int diff = wg1.physicalOutputTuple.getByteSize()
-            - wg2.physicalOutputTuple.getByteSize();
-        return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
-      }
-    }
-
-    private static final SizeLt SIZE_LT;
-    static {
-      SIZE_LT = new SizeLt();
-    }
-
-    /**
-     * Order window groups by increasing size of the output tuple. This minimizes
-     * the total volume of data that needs to be buffered.
-     */
-    public void orderWindowGroups() {
-      Collections.sort(windowGroups, SIZE_LT);
-    }
-  }
-
-  /**
-   * Partitions the windowGroups into SortGroups based on compatible order by exprs.
-   */
-  private List<SortGroup> collectSortGroups(List<WindowGroup> windowGroups) {
-    List<SortGroup> sortGroups = Lists.newArrayList();
-    for (WindowGroup windowGroup: windowGroups) {
-      boolean match = false;
-      for (SortGroup sortGroup: sortGroups) {
-        if (sortGroup.isCompatible(windowGroup)) {
-          sortGroup.add(windowGroup);
-          match = true;
-          break;
-        }
-      }
-      if (!match) sortGroups.add(new SortGroup(windowGroup));
-    }
-    return sortGroups;
-  }
-
-  /**
-   * Collection of SortGroups that have compatible partition-by specifications.
-   */
-  private static class PartitionGroup {
-    public List<Expr> partitionByExprs;
-    public List<SortGroup> sortGroups = Lists.newArrayList();
-
-    // sum of sortGroups.windowGroups.physicalOutputTuple.getByteSize()
-    public int totalOutputTupleSize = -1;
-
-    public PartitionGroup(SortGroup sortGroup) {
-      partitionByExprs = sortGroup.partitionByExprs;
-      sortGroups.add(sortGroup);
-      totalOutputTupleSize = sortGroup.totalOutputTupleSize;
-    }
-
-    /**
-     * True if the partition exprs of sortGroup are compatible with ours.
-     * For now that means equality.
-     */
-    public boolean isCompatible(SortGroup sortGroup) {
-      return Expr.equalSets(sortGroup.partitionByExprs, partitionByExprs);
-    }
-
-    public void add(SortGroup sortGroup) {
-      Preconditions.checkState(isCompatible(sortGroup));
-      sortGroups.add(sortGroup);
-      totalOutputTupleSize += sortGroup.totalOutputTupleSize;
-    }
-
-    /**
-     * Merge 'other' into 'this'
-     * - partitionByExprs is the intersection of the two
-     * - sortGroups becomes the union
-     */
-    public void merge(PartitionGroup other) {
-      partitionByExprs = Expr.intersect(partitionByExprs, other.partitionByExprs);
-      Preconditions.checkState(Expr.getNumDistinctValues(partitionByExprs) >= 0);
-      sortGroups.addAll(other.sortGroups);
-    }
-
-    /**
-     * Order sort groups by increasing totalOutputTupleSize. This minimizes the total
-     * volume of data that needs to be sorted.
-     */
-    public void orderSortGroups() {
-      Collections.sort(sortGroups,
-          new Comparator<SortGroup>() {
-            public int compare(SortGroup sg1, SortGroup sg2) {
-              Preconditions.checkState(sg1.totalOutputTupleSize > 0);
-              Preconditions.checkState(sg2.totalOutputTupleSize > 0);
-              int diff = sg1.totalOutputTupleSize - sg2.totalOutputTupleSize;
-              return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
-            }
-          });
-      for (SortGroup sortGroup: sortGroups) {
-        sortGroup.orderWindowGroups();
-      }
-    }
-  }
-
-  /**
-   * Extract a minimal set of PartitionGroups from sortGroups.
-   */
-  private List<PartitionGroup> collectPartitionGroups(List<SortGroup> sortGroups) {
-    List<PartitionGroup> partitionGroups = Lists.newArrayList();
-    for (SortGroup sortGroup: sortGroups) {
-      boolean match = false;
-      for (PartitionGroup partitionGroup: partitionGroups) {
-        if (partitionGroup.isCompatible(sortGroup)) {
-          partitionGroup.add(sortGroup);
-          match = true;
-          break;
-        }
-      }
-      if (!match) partitionGroups.add(new PartitionGroup(sortGroup));
-    }
-    return partitionGroups;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
deleted file mode 100644
index d58e5c4..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class CohortId extends Id<CohortId> {
-  // Construction only allowed via an IdGenerator.
-  protected CohortId(int id) {
-    super(id);
-  }
-
-  public static IdGenerator<CohortId> createGenerator() {
-    return new IdGenerator<CohortId>() {
-      @Override
-      public CohortId getNextId() { return new CohortId(nextId_++); }
-      @Override
-      public CohortId getMaxId() { return new CohortId(nextId_ - 1); }
-    };
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%02d", id_);
-  }
-}