You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/10 16:36:39 UTC

[1/2] impala git commit: IMPALA-7203. Support UDFs in LocalCatalog

Repository: impala
Updated Branches:
  refs/heads/master 3e17705ec -> 4af3a7853


IMPALA-7203. Support UDFs in LocalCatalog

This adds support to LocalCatalog to load persistent UDFs (both Java and
native) from the HMS. Transient UDFs are not supported, since, without a
central component to store them between restarts, there isn't any clear
path to doing so.

The various UDF tests do not yet pass reliably with this change since so
many of them rely on the transient Java UDF functionality. This is a
legacy feature that isn't commonly used today, and we may not be able to
support it in LocalCatalog.

For now, I tested manually that I could create, drop, and run some UDFs
and that they show up in 'show functions'.

Change-Id: I6130d07b9c641525382a618a9f8da048c7ae75ed
Reviewed-on: http://gerrit.cloudera.org:8080/11053
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/35713cac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/35713cac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/35713cac

Branch: refs/heads/master
Commit: 35713cace06f32a575ddee23863141643dd148a0
Parents: 3e17705
Author: Todd Lipcon <to...@cloudera.com>
Authored: Wed Jul 25 15:15:49 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Aug 10 05:13:12 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/FunctionCallExpr.java       |   2 -
 .../impala/catalog/CatalogServiceCatalog.java   | 171 +----------
 .../main/java/org/apache/impala/catalog/Db.java |  88 +-----
 .../catalog/local/DirectMetaProvider.java       |  20 ++
 .../apache/impala/catalog/local/LocalDb.java    | 194 +++++++++++-
 .../impala/catalog/local/MetaProvider.java      |  11 +
 .../impala/service/CatalogOpExecutor.java       |   4 +-
 .../org/apache/impala/util/FunctionUtils.java   | 295 +++++++++++++++++++
 .../org/apache/impala/util/PatternMatcher.java  |   9 +-
 9 files changed, 539 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 216f543..291420f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -22,8 +22,6 @@ import java.util.List;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.AggregateFunction;
 import org.apache.impala.catalog.BuiltinsDb;
-import org.apache.impala.catalog.Catalog;
-import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.ScalarFunction;

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 19006ae..7efff99 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -18,16 +18,12 @@
 package org.apache.impala.catalog;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,26 +32,19 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.ql.exec.FunctionUtils;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
-import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
-import org.apache.impala.hive.executor.UdfExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
@@ -69,6 +58,7 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTableUsageMetrics;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
 import org.apache.log4j.Logger;
@@ -78,7 +68,6 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 
 import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -163,7 +152,7 @@ import com.google.common.collect.Sets;
  * loading thread pool.
  */
 public class CatalogServiceCatalog extends Catalog {
-  private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
+  public static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
 
   private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
   private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
@@ -204,9 +193,6 @@ public class CatalogServiceCatalog extends Catalog {
   // policy metadata. Null if Sentry Service is not enabled.
   private final SentryProxy sentryProxy_;
 
-  // Local temporary directory to copy UDF Jars.
-  private static String localLibraryPath_;
-
   // Log of deleted catalog objects.
   private final CatalogDeltaLog deleteLog_;
 
@@ -220,6 +206,8 @@ public class CatalogServiceCatalog extends Catalog {
 
   private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
 
+  private final String localLibraryPath_;
+
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -249,7 +237,7 @@ public class CatalogServiceCatalog extends Catalog {
     } else {
       sentryProxy_ = null;
     }
-    localLibraryPath_ = new String("file://" + localLibraryPath);
+    localLibraryPath_ = localLibraryPath;
     deleteLog_ = new CatalogDeltaLog();
   }
 
@@ -780,131 +768,6 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Checks if the Hive function 'fn' is Impala compatible. A function is Impala
-   * compatible iff
-   *
-   * 1. The function is JAVA based,
-   * 2. Has exactly one binary resource associated (We don't support loading
-   *    dependencies yet) and
-   * 3. The binary is of type JAR.
-   *
-   * Returns true if compatible and false otherwise. In case of incompatible
-   * functions 'incompatMsg' has the reason for the incompatibility.
-   * */
-   public static boolean isFunctionCompatible(
-       org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) {
-    boolean isCompatible = true;
-    if (fn.getFunctionType() != FunctionType.JAVA) {
-      isCompatible = false;
-      incompatMsg.append("Function type: " + fn.getFunctionType().name()
-          + " is not supported. Only " + FunctionType.JAVA.name() + " functions "
-          + "are supported.");
-    } else if (fn.getResourceUrisSize() == 0) {
-      isCompatible = false;
-      incompatMsg.append("No executable binary resource (like a JAR file) is " +
-          "associated with this function. To fix this, recreate the function by " +
-          "specifying a 'location' in the function create statement.");
-    } else if (fn.getResourceUrisSize() != 1) {
-      isCompatible = false;
-      List<String> resourceUris = Lists.newArrayList();
-      for (ResourceUri resource: fn.getResourceUris()) {
-        resourceUris.add(resource.getUri());
-      }
-      incompatMsg.append("Impala does not support multiple Jars for dependencies."
-          + "(" + Joiner.on(",").join(resourceUris) + ") ");
-    } else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
-      isCompatible = false;
-      incompatMsg.append("Function binary type: " +
-        fn.getResourceUris().get(0).getResourceType().name()
-        + " is not supported. Only " + ResourceType.JAR.name()
-        + " type is supported.");
-    }
-    return isCompatible;
-  }
-
-  /**
-   * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
-   * class referred to by the given Java function. This method copies the UDF Jar
-   * referenced by "function" to a temporary file in localLibraryPath_ and loads it
-   * into the jvm. Then we scan all the methods in the class using reflection and extract
-   * those methods and create corresponding Impala functions. Currently Impala supports
-   * only "JAR" files for symbols and also a single Jar containing all the dependent
-   * classes rather than a set of Jar files.
-   */
-  public static List<Function> extractFunctions(String db,
-      org.apache.hadoop.hive.metastore.api.Function function)
-      throws ImpalaRuntimeException{
-    List<Function> result = Lists.newArrayList();
-    List<String> addedSignatures = Lists.newArrayList();
-    StringBuilder warnMessage = new StringBuilder();
-    if (!isFunctionCompatible(function, warnMessage)) {
-      LOG.warn("Skipping load of incompatible function: " +
-          function.getFunctionName() + ". " + warnMessage.toString());
-      return result;
-    }
-    String jarUri = function.getResourceUris().get(0).getUri();
-    Class<?> udfClass = null;
-    Path localJarPath = null;
-    try {
-      localJarPath = new Path(localLibraryPath_, UUID.randomUUID().toString() + ".jar");
-      try {
-        FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
-      } catch (IOException e) {
-        String errorMsg = "Error loading Java function: " + db + "." +
-            function.getFunctionName() + ". Couldn't copy " + jarUri +
-            " to local path: " + localJarPath.toString();
-        LOG.error(errorMsg, e);
-        throw new ImpalaRuntimeException(errorMsg);
-      }
-      URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};
-      URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls);
-      udfClass = urlClassLoader.loadClass(function.getClassName());
-      // Check if the class is of UDF type. Currently we don't support other functions
-      // TODO: Remove this once we support Java UDAF/UDTF
-      if (FunctionUtils.getUDFClassType(udfClass) != FunctionUtils.UDFClassType.UDF) {
-        LOG.warn("Ignoring load of incompatible Java function: " +
-            function.getFunctionName() + " as " + FunctionUtils.getUDFClassType(udfClass)
-            + " is not a supported type. Only UDFs are supported");
-        return result;
-      }
-      // Load each method in the UDF class and create the corresponding Impala Function
-      // object.
-      for (Method m: udfClass.getMethods()) {
-        if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue;
-        Function fn = ScalarFunction.fromHiveFunction(db,
-            function.getFunctionName(), function.getClassName(),
-            m.getParameterTypes(), m.getReturnType(), jarUri);
-        if (fn == null) {
-          LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of " +
-             "Hive UDF:" + function.getFunctionName() + " from " + udfClass);
-          continue;
-        }
-        if (!addedSignatures.contains(fn.signatureString())) {
-          result.add(fn);
-          addedSignatures.add(fn.signatureString());
-        }
-      }
-    } catch (ClassNotFoundException c) {
-      String errorMsg = "Error loading Java function: " + db + "." +
-          function.getFunctionName() + ". Symbol class " + udfClass +
-          "not found in Jar: " + jarUri;
-      LOG.error(errorMsg);
-      throw new ImpalaRuntimeException(errorMsg, c);
-    } catch (Exception e) {
-      LOG.error("Skipping function load: " + function.getFunctionName(), e);
-      throw new ImpalaRuntimeException("Error extracting functions", e);
-    } catch (LinkageError e) {
-      String errorMsg = "Error resolving dependencies for Java function: " + db + "." +
-          function.getFunctionName();
-      LOG.error(errorMsg);
-      throw new ImpalaRuntimeException(errorMsg, e);
-    } finally {
-      if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath);
-    }
-    return result;
-  }
-
- /**
    * Extracts Impala functions stored in metastore db parameters and adds them to
    * the catalog cache.
    */
@@ -912,21 +775,13 @@ public class CatalogServiceCatalog extends Catalog {
       org.apache.hadoop.hive.metastore.api.Database msDb) {
     if (msDb == null || msDb.getParameters() == null) return;
     LOG.info("Loading native functions for database: " + db.getName());
-    TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
-    for (String key: msDb.getParameters().keySet()) {
-      if (!key.startsWith(Db.FUNCTION_INDEX_PREFIX)) continue;
-      try {
-        TFunction fn = new TFunction();
-        JniUtil.deserializeThrift(protocolFactory, fn,
-            Base64.decodeBase64(msDb.getParameters().get(key)));
-        Function addFn = Function.fromThrift(fn);
-        db.addFunction(addFn, false);
-        addFn.setCatalogVersion(incrementAndGetCatalogVersion());
-      } catch (ImpalaException e) {
-        LOG.error("Encountered an error during function load: key=" + key
-            + ",continuing", e);
-      }
+    List<Function> funcs = FunctionUtils.deserializeNativeFunctionsFromDbParams(
+        msDb.getParameters());
+    for (Function f : funcs) {
+      db.addFunction(f, false);
+      f.setCatalogVersion(incrementAndGetCatalogVersion());
     }
+
     LOG.info("Loaded native functions for database: " + db.getName());
   }
 
@@ -941,7 +796,9 @@ public class CatalogServiceCatalog extends Catalog {
     LOG.info("Loading Java functions for database: " + db.getName());
     for (org.apache.hadoop.hive.metastore.api.Function function: functions) {
       try {
-        for (Function fn: extractFunctions(db.getName(), function)) {
+        List<Function> fns = FunctionUtils.extractFunctions(db.getName(), function,
+            localLibraryPath_);
+        for (Function fn: fns) {
           db.addFunction(fn);
           fn.setCatalogVersion(incrementAndGetCatalogVersion());
         }

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 0c3c2bd..cb98c85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -18,7 +18,6 @@
 package org.apache.impala.catalog;
 
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
 
 import com.google.common.base.Preconditions;
@@ -187,45 +187,6 @@ public class Db extends CatalogObjectImpl implements FeDb {
     return HdfsTable.createCtasTarget(this, msTbl);
   }
 
-  /**
-   * Comparator that sorts function overloads. We want overloads to be always considered
-   * in a canonical order so that overload resolution in the case of multiple valid
-   * overloads does not depend on the order in which functions are added to the Db. The
-   * order is based on the PrimitiveType enum because this was the order used implicitly
-   * for builtin operators and functions in earlier versions of Impala.
-   */
-  private static class FunctionResolutionOrder implements Comparator<Function> {
-    @Override
-    public int compare(Function f1, Function f2) {
-      int numSharedArgs = Math.min(f1.getNumArgs(), f2.getNumArgs());
-      for (int i = 0; i < numSharedArgs; ++i) {
-        int cmp = typeCompare(f1.getArgs()[i], f2.getArgs()[i]);
-        if (cmp < 0) {
-          return -1;
-        } else if (cmp > 0) {
-          return 1;
-        }
-      }
-      // Put alternative with fewer args first.
-      if (f1.getNumArgs() < f2.getNumArgs()) {
-        return -1;
-      } else if (f1.getNumArgs() > f2.getNumArgs()) {
-        return 1;
-      }
-      return 0;
-    }
-
-    private int typeCompare(Type t1, Type t2) {
-      Preconditions.checkState(!t1.isComplexType());
-      Preconditions.checkState(!t2.isComplexType());
-      return Integer.compare(t1.getPrimitiveType().ordinal(),
-          t2.getPrimitiveType().ordinal());
-    }
-  }
-
-  private static final FunctionResolutionOrder FUNCTION_RESOLUTION_ORDER =
-      new FunctionResolutionOrder();
-
   @Override // FeDb
   public org.apache.hadoop.hive.metastore.api.Database getMetaStoreDb() {
     return thriftDb_.getMetastore_db();
@@ -253,31 +214,8 @@ public class Db extends CatalogObjectImpl implements FeDb {
     synchronized (functions_) {
       List<Function> fns = functions_.get(desc.functionName());
       if (fns == null) return null;
-
-      // First check for identical
-      for (Function f: fns) {
-        if (f.compare(desc, Function.CompareMode.IS_IDENTICAL)) return f;
-      }
-      if (mode == Function.CompareMode.IS_IDENTICAL) return null;
-
-      // Next check for indistinguishable
-      for (Function f: fns) {
-        if (f.compare(desc, Function.CompareMode.IS_INDISTINGUISHABLE)) return f;
-      }
-      if (mode == Function.CompareMode.IS_INDISTINGUISHABLE) return null;
-
-      // Next check for strict supertypes
-      for (Function f: fns) {
-        if (f.compare(desc, Function.CompareMode.IS_SUPERTYPE_OF)) return f;
-      }
-      if (mode == Function.CompareMode.IS_SUPERTYPE_OF) return null;
-
-      // Finally check for non-strict supertypes
-      for (Function f: fns) {
-        if (f.compare(desc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF)) return f;
-      }
+      return FunctionUtils.resolveFunction(fns, desc, mode);
     }
-    return null;
   }
 
   public Function getFunction(String signatureString) {
@@ -345,7 +283,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
       }
       if (addToDbParams && !addFunctionToDbParams(fn)) return false;
       fns.add(fn);
-      Collections.sort(fns, FUNCTION_RESOLUTION_ORDER);
+      Collections.sort(fns, FunctionUtils.FUNCTION_RESOLUTION_ORDER);
       return true;
     }
   }
@@ -472,31 +410,23 @@ public class Db extends CatalogObjectImpl implements FeDb {
    */
   @Override // FeDb
   public List<Function> getFunctions(String name) {
-    List<Function> result = Lists.newArrayList();
     Preconditions.checkNotNull(name);
     synchronized (functions_) {
-      if (!functions_.containsKey(name)) return result;
-      for (Function fn: functions_.get(name)) {
-        if (fn.userVisible()) result.add(fn);
-      }
+      List<Function> candidates = functions_.get(name);
+      if (candidates == null) return Lists.newArrayList();
+      return FunctionUtils.getVisibleFunctions(candidates);
     }
-    return result;
   }
 
   @Override
   public List<Function> getFunctions(TFunctionCategory category, String name) {
-    List<Function> result = Lists.newArrayList();
     Preconditions.checkNotNull(category);
     Preconditions.checkNotNull(name);
     synchronized (functions_) {
-      if (!functions_.containsKey(name)) return result;
-      for (Function fn: functions_.get(name)) {
-        if (fn.userVisible() && Function.categoryMatch(fn, category)) {
-          result.add(fn);
-        }
-      }
+      List<Function> candidates = functions_.get(name);
+      if (candidates == null) return Lists.newArrayList();
+      return FunctionUtils.getVisibleFunctionsInCategory(candidates, category);
     }
-    return result;
   }
 
   public TCatalogObject toTCatalogObject() {

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 59acd9d..6ac7187 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -161,6 +162,25 @@ class DirectMetaProvider implements MetaProvider {
     return ret;
   }
 
+  @Override
+  public List<String> loadFunctionNames(String dbName) throws TException {
+    Preconditions.checkNotNull(dbName);
+    try (MetaStoreClient c = msClientPool_.getClient()) {
+      return ImmutableList.copyOf(c.getHiveClient().getFunctions(
+          dbName, /*pattern=*/null));
+    }
+  }
+
+
+  @Override
+  public Function getFunction(String dbName, String functionName) throws TException {
+    Preconditions.checkNotNull(dbName);
+    Preconditions.checkNotNull(functionName);
+
+    try (MetaStoreClient c = msClientPool_.getClient()) {
+      return c.getHiveClient().getFunction(dbName, functionName);
+    }
+  }
 
   @Override
   public List<ColumnStatisticsObj> loadTableColumnStatistics(String dbName,

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index 136093a..f71d206 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -18,6 +18,7 @@
 package org.apache.impala.catalog.local;
 
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -32,15 +33,23 @@ import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+
 /**
  * Database instance loaded from {@link LocalCatalog}.
  *
@@ -48,6 +57,8 @@ import com.google.common.collect.Maps;
  * each catalog instance.
  */
 class LocalDb implements FeDb {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalDb.class);
+
   private final LocalCatalog catalog_;
   /** The lower-case name of the database. */
   private final String name_;
@@ -59,6 +70,11 @@ class LocalDb implements FeDb {
    */
   private Map<String, LocalTable> tables_;
 
+  /**
+   * Map of function name to list of signatures for that function name.
+   */
+  private Map<String, FunctionOverloads> functions_;
+
   public LocalDb(LocalCatalog catalog, String dbName) {
     Preconditions.checkNotNull(catalog);
     Preconditions.checkNotNull(dbName);
@@ -156,40 +172,131 @@ class LocalDb implements FeDb {
 
   @Override
   public Function getFunction(Function desc, CompareMode mode) {
-    // TODO(todd): implement functions
-    return null;
+    loadFunction(desc.functionName());
+    FunctionOverloads funcs = functions_.get(desc.functionName());
+    if (funcs == null) return null;
+    return FunctionUtils.resolveFunction(funcs, desc, mode);
+  }
+
+  /**
+   * Populate the 'functions_' map if not already populated.
+   * This handles loading persistent native functions, since they're already
+   * present in the DB-level metadata, and loading the list of Java function
+   * names. The Java function signatures themselves are lazy-loaded as necessary
+   * by loadFunction(...).
+   */
+  private void loadFunctionNames() {
+    if (functions_ != null) return;
+
+    // Load the Java functions names. We don't load the actual metadata
+    // for them unless they get looked up.
+    List<String> javaFuncNames;
+
+    try {
+      javaFuncNames = catalog_.getMetaProvider().loadFunctionNames(name_);
+    } catch (TException e) {
+      throw new LocalCatalogException(String.format(
+          "Could not load functions for database '%s' from HMS", name_), e);
+    }
+
+    Map<String, FunctionOverloads> newMap = Maps.newHashMap();
+    for (String fn : javaFuncNames) {
+      newMap.put(fn, new FunctionOverloads(/*javaNeedsLoad=*/true));
+    }
+
+    // Load native functions.
+    List<Function> nativeFuncs = FunctionUtils.deserializeNativeFunctionsFromDbParams(
+        getMetaStoreDb().getParameters());
+    for (Function fn : nativeFuncs) {
+      String fnName = fn.functionName();
+      FunctionOverloads dst = newMap.get(fnName);
+      if (dst == null) {
+        // We know there were no Java functions by this name since we didn't see
+        // this function above in the HMS-derived function list.
+        dst = new FunctionOverloads(/*javaNeedsLoad=*/false);
+        newMap.put(fnName, dst);
+      }
+      dst.add(fn);
+    }
+
+    functions_ = newMap;
+  }
+
+  /**
+   * Ensure that the given function has been fully loaded.
+   * If this function does not exist, this is a no-op.
+   */
+  private void loadFunction(String functionName) {
+    loadFunctionNames();
+
+    FunctionOverloads overloads = functions_.get(functionName);
+    // If the function isn't in the map at all, then it doesn't exist.
+    if (overloads == null) return;
+
+    // If it's in the map, the native functions will already have been loaded,
+    // since we get that info from the DB params. But, we may still need to
+    // load Java info.
+    if (!overloads.javaNeedsLoad()) return;
+
+    org.apache.hadoop.hive.metastore.api.Function msFunc;
+    try {
+      msFunc = catalog_.getMetaProvider().getFunction(name_, functionName);
+    } catch (TException e) {
+      throw new LocalCatalogException(String.format(
+          "Could not load function '%s.%s' from HMS",
+          name_, functionName), e);
+    }
+
+    try {
+      overloads.setJavaFunctions(FunctionUtils.extractFunctions(name_, msFunc,
+          BackendConfig.INSTANCE.getBackendCfg().local_library_path));
+    } catch (ImpalaRuntimeException e) {
+      throw new LocalCatalogException(String.format(
+          "Could not load Java function definitions for '%s.%s'",
+          name_, functionName), e);
+    }
   }
 
   @Override
   public List<Function> getFunctions(String functionName) {
-    // TODO(todd): implement functions
-    return Collections.emptyList();
+    loadFunction(functionName);
+    FunctionOverloads funcs = functions_.get(functionName);
+    if (funcs == null) return Collections.emptyList();
+    return FunctionUtils.getVisibleFunctions(funcs);
   }
 
   @Override
   public List<Function> getFunctions(
-      TFunctionCategory category, String function) {
-    // TODO(todd): implement functions
-    return Collections.emptyList();
+      TFunctionCategory category, String functionName) {
+    loadFunction(functionName);
+    FunctionOverloads funcs = functions_.get(functionName);
+    if (funcs == null) return Collections.emptyList();
+    return FunctionUtils.getVisibleFunctionsInCategory(funcs, category);
   }
 
   @Override
   public List<Function> getFunctions(
-      TFunctionCategory category, PatternMatcher patternMatcher) {
-    // TODO(todd): implement functions
-    return Collections.emptyList();
+      TFunctionCategory category, PatternMatcher matcher) {
+    loadFunctionNames();
+    List<Function> result = Lists.newArrayList();
+    Iterable<String> fnNames = Iterables.filter(functions_.keySet(), matcher);
+    for (String fnName : fnNames) {
+      result.addAll(getFunctions(category, fnName));
+    }
+    return result;
   }
 
   @Override
   public int numFunctions() {
-    // TODO(todd): implement functions
-    return 0;
+    loadFunctionNames();
+    return functions_.size();
   }
 
   @Override
   public boolean containsFunction(String function) {
-    // TODO(todd): implement functions
-    return false;
+    loadFunctionNames();
+    // TODO(todd): does this need to be lower-cased here?
+    return functions_.containsKey(function);
   }
 
   @Override
@@ -202,4 +309,63 @@ class LocalDb implements FeDb {
   LocalCatalog getCatalog() {
     return catalog_;
   }
+
+  /**
+   * Captures the set of function overloads with a given name. This tracks the
+   * lazy-loading state of whether the Java signatures have been loaded yet,
+   * and ensures that only a properly-sorted list is exposed.
+   */
+  private static class FunctionOverloads implements Iterable<Function> {
+    /**
+     * The loaded functions, or null if no functions have not been loaded.
+     */
+    private List<Function> functions_;
+
+    /**
+     * The function list is lazily sorted only if it gets iterated, so that
+     * we don't pay any cost for sorting in the case when function names are
+     * loaded but a given function is not actually resolved in a query.
+     */
+    private boolean needsSort_ = false;
+
+    /**
+     * Whether Java functions still need to be loaded.
+     */
+    private boolean javaNeedsLoad_ = true;
+
+    FunctionOverloads(boolean javaNeedsLoad) {
+      this.javaNeedsLoad_ = javaNeedsLoad;
+    }
+
+    @Override
+    public Iterator<Function> iterator() {
+      Preconditions.checkState(!javaNeedsLoad_);
+      if (needsSort_) {
+        Collections.sort(functions_, FunctionUtils.FUNCTION_RESOLUTION_ORDER);
+        needsSort_ = false;
+      }
+      return functions_.iterator();
+    }
+
+    public void add(Function fn) {
+      if (functions_ == null) functions_ = Lists.newArrayList();
+      functions_.add(fn);
+      needsSort_ = true;
+    }
+
+    public boolean javaNeedsLoad() {
+      return javaNeedsLoad_;
+    }
+
+    public void setJavaFunctions(List<Function> fns) {
+      Preconditions.checkState(javaNeedsLoad_);
+      javaNeedsLoad_ = false;
+      needsSort_ |= !fns.isEmpty();
+      if (functions_ == null) {
+        functions_ = fns;
+        return;
+      }
+      functions_.addAll(fns);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 9f433f8..75d389e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -61,6 +62,16 @@ interface MetaProvider {
       throws MetaException, TException;
 
   /**
+   * Retrieve the list of functions in the given database.
+   */
+  List<String> loadFunctionNames(String dbName) throws TException;
+
+  /**
+   * Retrieve the specified function from the metadata store.
+   */
+  Function getFunction(String dbName, String functionName) throws TException;
+
+  /**
    * Load the given partitions from the specified table.
    *
    * If a requested partition does not exist, no exception will be thrown.

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index dc19e9f..40f604d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -156,6 +156,7 @@ import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTruncateParams;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateCatalogResponse;
+import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.log4j.Logger;
@@ -1080,7 +1081,8 @@ public class CatalogOpExecutor {
         Preconditions.checkState(fn instanceof ScalarFunction);
         org.apache.hadoop.hive.metastore.api.Function hiveFn =
             ((ScalarFunction)fn).toHiveFunction();
-        List<Function> funcs = CatalogServiceCatalog.extractFunctions(fn.dbName(), hiveFn);
+        List<Function> funcs = FunctionUtils.extractFunctions(fn.dbName(), hiveFn,
+            BackendConfig.INSTANCE.getBackendCfg().local_library_path);
         if (funcs.isEmpty()) {
           throw new CatalogException(
             "No compatible function signatures found in class: " + hiveFn.getClassName());

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/FunctionUtils.java b/fe/src/main/java/org/apache/impala/util/FunctionUtils.java
new file mode 100644
index 0000000..686bf18
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/FunctionUtils.java
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.Function.CompareMode;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.JniUtil;
+import org.apache.impala.hive.executor.UdfExecutor;
+import org.apache.impala.thrift.TFunction;
+import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.log4j.Logger;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public abstract class FunctionUtils {
+  public static final Logger LOG = Logger.getLogger(FunctionUtils.class);
+
+  public static final FunctionResolutionOrder FUNCTION_RESOLUTION_ORDER =
+      new FunctionResolutionOrder();
+
+  /**
+   * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
+   * class referred to by the given Java function. This method copies the UDF Jar
+   * referenced by "function" to a temporary file in localLibraryPath_ and loads it
+   * into the jvm. Then we scan all the methods in the class using reflection and extract
+   * those methods and create corresponding Impala functions. Currently Impala supports
+   * only "JAR" files for symbols and also a single Jar containing all the dependent
+   * classes rather than a set of Jar files.
+   */
+  public static List<Function> extractFunctions(String db,
+      org.apache.hadoop.hive.metastore.api.Function function,
+      String localLibPath)
+      throws ImpalaRuntimeException{
+    List<Function> result = Lists.newArrayList();
+    List<String> addedSignatures = Lists.newArrayList();
+    StringBuilder warnMessage = new StringBuilder();
+    if (!FunctionUtils.isFunctionCompatible(function, warnMessage)) {
+      LOG.warn("Skipping load of incompatible function: " +
+          function.getFunctionName() + ". " + warnMessage.toString());
+      return result;
+    }
+    String jarUri = function.getResourceUris().get(0).getUri();
+    Class<?> udfClass = null;
+    Path localJarPath = null;
+    try {
+      // TODO(todd): cache these jars based on the mtime and file ID of the
+      // remote JAR? Can we share a cache with the backend?
+      localJarPath = new Path("file://" + localLibPath,
+          UUID.randomUUID().toString() + ".jar");
+      try {
+        FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
+      } catch (IOException e) {
+        String errorMsg = "Error loading Java function: " + db + "." +
+            function.getFunctionName() + ". Couldn't copy " + jarUri +
+            " to local path: " + localJarPath.toString();
+        LOG.error(errorMsg, e);
+        throw new ImpalaRuntimeException(errorMsg);
+      }
+      URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};
+      URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls);
+      // TODO(todd): above class loader is leaked without closing.
+      udfClass = urlClassLoader.loadClass(function.getClassName());
+      // Check if the class is of UDF type. Currently we don't support other functions
+      // TODO: Remove this once we support Java UDAF/UDTF
+      if (org.apache.hadoop.hive.ql.exec.FunctionUtils.getUDFClassType(udfClass) !=
+          org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType.UDF) {
+        LOG.warn("Ignoring load of incompatible Java function: " +
+            function.getFunctionName() + " as " +
+            org.apache.hadoop.hive.ql.exec.FunctionUtils.getUDFClassType(udfClass)
+            + " is not a supported type. Only UDFs are supported");
+        return result;
+      }
+      // Load each method in the UDF class and create the corresponding Impala Function
+      // object.
+      for (Method m: udfClass.getMethods()) {
+        if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue;
+        Function fn = ScalarFunction.fromHiveFunction(db,
+            function.getFunctionName(), function.getClassName(),
+            m.getParameterTypes(), m.getReturnType(), jarUri);
+        if (fn == null) {
+          LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of " +
+             "Hive UDF:" + function.getFunctionName() + " from " + udfClass);
+          continue;
+        }
+        if (!addedSignatures.contains(fn.signatureString())) {
+          result.add(fn);
+          addedSignatures.add(fn.signatureString());
+        }
+      }
+    } catch (ClassNotFoundException c) {
+      String errorMsg = "Error loading Java function: " + db + "." +
+          function.getFunctionName() + ". Symbol class " + function.getClassName() +
+          " not found in Jar: " + jarUri;
+      LOG.error(errorMsg);
+      throw new ImpalaRuntimeException(errorMsg, c);
+    } catch (Exception e) {
+      LOG.error("Skipping function load: " + function.getFunctionName(), e);
+      throw new ImpalaRuntimeException("Error extracting functions", e);
+    } catch (LinkageError e) {
+      String errorMsg = "Error resolving dependencies for Java function: " + db + "." +
+          function.getFunctionName();
+      LOG.error(errorMsg);
+      throw new ImpalaRuntimeException(errorMsg, e);
+    } finally {
+      if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath);
+    }
+    return result;
+  }
+
+  public static List<Function> deserializeNativeFunctionsFromDbParams(
+      Map<String, String> dbParams) {
+    List<Function> results = Lists.newArrayList();
+    TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
+    for (Map.Entry<String, String> entry: dbParams.entrySet()) {
+      if (!entry.getKey().startsWith(Db.FUNCTION_INDEX_PREFIX)) continue;
+      try {
+        TFunction fn = new TFunction();
+        JniUtil.deserializeThrift(protocolFactory, fn,
+            Base64.decodeBase64(entry.getValue()));
+        results.add(Function.fromThrift(fn));
+      } catch (ImpalaException e) {
+        LOG.error("Encountered an error during function load: key=" +
+            entry.getKey() + ",continuing", e);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Checks if the Hive function 'fn' is Impala compatible. A function is Impala
+   * compatible iff
+   *
+   * 1. The function is JAVA based,
+   * 2. Has exactly one binary resource associated (We don't support loading
+   *    dependencies yet) and
+   * 3. The binary is of type JAR.
+   *
+   * Returns true if compatible and false otherwise. In case of incompatible
+   * functions 'incompatMsg' has the reason for the incompatibility.
+   * */
+   private static boolean isFunctionCompatible(
+       org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) {
+    boolean isCompatible = true;
+    if (fn.getFunctionType() != FunctionType.JAVA) {
+      isCompatible = false;
+      incompatMsg.append("Function type: " + fn.getFunctionType().name()
+          + " is not supported. Only " + FunctionType.JAVA.name() + " functions "
+          + "are supported.");
+    } else if (fn.getResourceUrisSize() == 0) {
+      isCompatible = false;
+      incompatMsg.append("No executable binary resource (like a JAR file) is " +
+          "associated with this function. To fix this, recreate the function by " +
+          "specifying a 'location' in the function create statement.");
+    } else if (fn.getResourceUrisSize() != 1) {
+      isCompatible = false;
+      List<String> resourceUris = Lists.newArrayList();
+      for (ResourceUri resource: fn.getResourceUris()) {
+        resourceUris.add(resource.getUri());
+      }
+      incompatMsg.append("Impala does not support multiple Jars for dependencies."
+          + "(" + Joiner.on(",").join(resourceUris) + ") ");
+    } else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
+      isCompatible = false;
+      incompatMsg.append("Function binary type: " +
+        fn.getResourceUris().get(0).getResourceType().name()
+        + " is not supported. Only " + ResourceType.JAR.name()
+        + " type is supported.");
+    }
+    return isCompatible;
+  }
+
+  public static Function resolveFunction(Iterable<Function> fns, Function desc,
+      CompareMode mode) {
+    Preconditions.checkNotNull(fns);
+    Preconditions.checkNotNull(desc);
+    Preconditions.checkNotNull(mode);
+
+    // First check for identical
+    for (Function f: fns) {
+      if (f.compare(desc, Function.CompareMode.IS_IDENTICAL)) return f;
+    }
+    if (mode == Function.CompareMode.IS_IDENTICAL) return null;
+
+    // Next check for indistinguishable
+    for (Function f: fns) {
+      if (f.compare(desc, Function.CompareMode.IS_INDISTINGUISHABLE)) return f;
+    }
+    if (mode == Function.CompareMode.IS_INDISTINGUISHABLE) return null;
+
+    // Next check for strict supertypes
+    for (Function f: fns) {
+      if (f.compare(desc, Function.CompareMode.IS_SUPERTYPE_OF)) return f;
+    }
+    if (mode == Function.CompareMode.IS_SUPERTYPE_OF) return null;
+
+    // Finally check for non-strict supertypes
+    for (Function f: fns) {
+      if (f.compare(desc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF)) return f;
+    }
+    return null;
+  }
+
+  public static List<Function> getVisibleFunctionsInCategory(
+      Iterable<Function> candidates, TFunctionCategory category) {
+    List<Function> result = Lists.newArrayList();
+    for (Function fn: candidates) {
+      if (fn.userVisible() && Function.categoryMatch(fn, category)) {
+        result.add(fn);
+      }
+    }
+    return result;
+  }
+
+  public static List<Function> getVisibleFunctions(Iterable<Function> candidates) {
+    List<Function> result = Lists.newArrayList();
+    for (Function fn: candidates) {
+      if (fn.userVisible()) result.add(fn);
+    }
+    return result;
+  }
+
+  /**
+   * Comparator that sorts function overloads. We want overloads to be always considered
+   * in a canonical order so that overload resolution in the case of multiple valid
+   * overloads does not depend on the order in which functions are added to the Db. The
+   * order is based on the PrimitiveType enum because this was the order used implicitly
+   * for builtin operators and functions in earlier versions of Impala.
+   */
+  public static class FunctionResolutionOrder implements Comparator<Function> {
+    @Override
+    public int compare(Function f1, Function f2) {
+      int numSharedArgs = Math.min(f1.getNumArgs(), f2.getNumArgs());
+      for (int i = 0; i < numSharedArgs; ++i) {
+        int cmp = typeCompare(f1.getArgs()[i], f2.getArgs()[i]);
+        if (cmp < 0) {
+          return -1;
+        } else if (cmp > 0) {
+          return 1;
+        }
+      }
+      // Put alternative with fewer args first.
+      if (f1.getNumArgs() < f2.getNumArgs()) {
+        return -1;
+      } else if (f1.getNumArgs() > f2.getNumArgs()) {
+        return 1;
+      }
+      return 0;
+    }
+
+    private int typeCompare(Type t1, Type t2) {
+      Preconditions.checkState(!t1.isComplexType());
+      Preconditions.checkState(!t2.isComplexType());
+      return Integer.compare(t1.getPrimitiveType().ordinal(),
+          t2.getPrimitiveType().ordinal());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/35713cac/fe/src/main/java/org/apache/impala/util/PatternMatcher.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/PatternMatcher.java b/fe/src/main/java/org/apache/impala/util/PatternMatcher.java
index 37fa208..b097468 100644
--- a/fe/src/main/java/org/apache/impala/util/PatternMatcher.java
+++ b/fe/src/main/java/org/apache/impala/util/PatternMatcher.java
@@ -18,11 +18,11 @@
 package org.apache.impala.util;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 
 /**
@@ -30,7 +30,7 @@ import com.google.common.collect.Lists;
  * e.g. hive SHOW patterns, JDBC patterns).
  * It maps those patterns onto the java regex pattern objects.
  */
-public class PatternMatcher {
+public class PatternMatcher implements Predicate<String> {
   // Patterns to match against. A string is considered to match if it matches
   // any of the patterns.
   private List<Pattern> patterns_;
@@ -46,6 +46,11 @@ public class PatternMatcher {
     return false;
   }
 
+  @Override // Implementation of Predicate interface.
+  public boolean apply(String input) {
+    return matches(input);
+  }
+
   // Immutable pattern matcher that matches all
   private final static class MatchAllPatternMatcher extends PatternMatcher {
     MatchAllPatternMatcher() {}


[2/2] impala git commit: IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS

Posted by ta...@apache.org.
IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS

This takes advantage of work (e.g. IMPALA-3200, IMPALA-5844)
to remove a couple of uses of the API.

Testing:
Ran core, ASAN and exhaustive builds.

Added unit tests to directly test the attaching behaviour.

Change-Id: I5f5b8a418d4816f603a64da6287ec392dbd4603f
Reviewed-on: http://gerrit.cloudera.org:8080/11156
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4af3a785
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4af3a785
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4af3a785

Branch: refs/heads/master
Commit: 4af3a7853e9d450675a61bff5b031a12e7a05172
Parents: 35713ca
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 19 11:20:40 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Aug 10 09:22:11 2018 +0000

----------------------------------------------------------------------
 be/src/exec/grouping-aggregator.cc           |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc | 249 ++++++++++++++++++++--
 be/src/runtime/buffered-tuple-stream.cc      | 145 ++++++++-----
 be/src/runtime/buffered-tuple-stream.h       | 138 +++++++-----
 be/src/runtime/row-batch.h                   |   1 +
 be/src/runtime/tuple.h                       |   4 +
 6 files changed, 408 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4af3a785/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 092ecfd..42d6be8 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -294,8 +294,12 @@ Status GroupingAggregator::GetRowsFromPartition(
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   partition_eos_ = ReachedLimit();
-  if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
-
+  if (partition_eos_ || output_iterator_.AtEnd()) {
+    // Attach all buffers referenced by previously-returned rows. On the next GetNext()
+    // call we will close the partition.
+    output_partition_->aggregated_row_stream->Close(
+        row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4af3a785/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index ef66824..6ff9805 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -398,6 +398,22 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestTransferMemory(bool pinned_stream, bool read_write);
 
+  void TestAttachMemory(bool pinned_stream, bool attach_on_read);
+
+  void TestFlushResourcesReadWrite(bool pinned_stream, bool attach_on_read);
+
+  /// Helper for TestFlushResourcesReadWrite() to write and read back rows from
+  /// *stream. 'append_batch_size' is the number of rows to append at a time before
+  /// reading them back. *num_buffers_attached tracks the number of buffers attached
+  /// to the output batch.
+  void AppendToReadWriteStream(int64_t append_batch_size, int64_t buffer_size,
+      int* num_buffers_attached, BufferedTupleStream* stream);
+
+  // Helper for AppendToReadWriteStream() to verify 'out_batch' contents. The value of
+  // row i of 'out_batch' is expected to be the same as the row at index
+  // (i + start_index) % out_batch->num_rows() of 'in_batch'.
+  void VerifyReadWriteBatch(RowBatch* in_batch, RowBatch* out_batch, int64_t start_index);
+
   // Helper to writes 'row' comprised of only string slots to 'data'. The expected
   // length of the data written is 'expected_len'.
   void WriteStringRow(const RowDescriptor* row_desc, TupleRow* row, int64_t fixed_size,
@@ -649,13 +665,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
   ASSERT_TRUE(pinned);
 
   // Read and verify result a few times. We should be able to reread the stream if
-  // we don't use delete on read mode.
+  // we don't use attach on read mode.
   int read_iters = 3;
   for (int i = 0; i < read_iters; ++i) {
-    bool delete_on_read = i == read_iters - 1;
+    bool attach_on_read = i == read_iters - 1;
     if (i > 0 || !read_write) {
       bool got_read_reservation;
-      ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+      ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
       ASSERT_TRUE(got_read_reservation);
     }
 
@@ -670,15 +686,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     }
   }
 
-  // After delete_on_read, all blocks aside from the last should be deleted.
-  // Note: this should really be 0, but the BufferedTupleStream returns eos before
-  // deleting the last block, rather than after, so the last block isn't deleted
-  // until the stream is closed.
-  ASSERT_EQ(stream.BytesPinned(false), buffer_size);
+  // After attach_on_read, all buffers should have been attached to the output batches
+  // on previous GetNext() calls.
+  ASSERT_EQ(0, stream.BytesPinned(false));
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 
-  ASSERT_EQ(stream.BytesPinned(false), 0);
+  ASSERT_EQ(0, stream.BytesPinned(false));
 }
 
 TEST_F(SimpleTupleStreamTest, UnpinPin) {
@@ -765,6 +779,205 @@ TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
   TestTransferMemory(false, false);
 }
 
+/// Test iteration over a stream with and without attaching memory.
+void SimpleTupleStreamTest::TestAttachMemory(bool pin_stream, bool attach_on_read) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+  RowBatch* in_batch = CreateIntBatch(0, 1024, false);
+
+  // Construct a stream with 4 pages.
+  const int total_num_pages = 4;
+  while (stream.byte_size() < total_num_pages * buffer_size) {
+    Status status;
+    for (int i = 0; i < in_batch->num_rows(); ++i) {
+      bool ret = stream.AddRow(in_batch->GetRow(i), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+  }
+
+  RowBatch* out_batch = pool_.Add(new RowBatch(int_desc_, 100, &tracker_));
+  int num_buffers_attached = 0;
+  int num_flushes = 0;
+  int64_t num_rows_returned = 0;
+  bool got_read_reservation;
+  ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+  ASSERT_TRUE(got_read_reservation);
+  bool eos;
+  do {
+    ASSERT_EQ(0, out_batch->num_buffers());
+    ASSERT_OK(stream.GetNext(out_batch, &eos));
+    EXPECT_LE(out_batch->num_buffers(), 1) << "Should only attach one buffer at a time";
+    if (out_batch->num_buffers() > 0) {
+      EXPECT_TRUE(out_batch->AtCapacity()) << "Flush resources flag should have been set";
+    }
+    num_buffers_attached += out_batch->num_buffers();
+    for (int i = 0; i < out_batch->num_rows(); ++i) {
+      int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
+      TupleRow* in_row = in_batch->GetRow(num_rows_returned % in_batch->num_rows());
+      EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
+          *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
+      ++num_rows_returned;
+    }
+    num_flushes += out_batch->flush_mode() == RowBatch::FlushMode::FLUSH_RESOURCES;
+    out_batch->Reset();
+  } while (!eos);
+
+  if (attach_on_read) {
+    EXPECT_EQ(4, num_buffers_attached) << "All buffers attached during iteration.";
+  } else {
+    EXPECT_EQ(0, num_buffers_attached) << "No buffers attached during iteration.";
+  }
+  if (attach_on_read || !pin_stream) EXPECT_EQ(4, num_flushes);
+  out_batch->Reset();
+  stream.Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  if (attach_on_read) {
+    EXPECT_EQ(0, out_batch->num_buffers());
+  } else if (pin_stream) {
+    // All buffers should be attached.
+    EXPECT_EQ(4, out_batch->num_buffers());
+  } else {
+    // Buffer from last pinned page should be attached.
+    EXPECT_EQ(1, out_batch->num_buffers());
+  }
+  in_batch->Reset();
+  out_batch->Reset();
+}
+
+TEST_F(SimpleTupleStreamTest, TestAttachMemoryPinned) {
+  TestAttachMemory(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryPinned) {
+  TestAttachMemory(true, false);
+}
+
+TEST_F(SimpleTupleStreamTest, TestAttachMemoryUnpinned) {
+  TestAttachMemory(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryUnpinned) {
+  TestAttachMemory(false, false);
+}
+
+// Test for advancing the read/write page with resource flushing.
+void SimpleTupleStreamTest::TestFlushResourcesReadWrite(
+    bool pin_stream, bool attach_on_read) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  const int BUFFER_SIZE = 512;
+  const int BATCH_SIZE = 100;
+  // For unpinned streams, we should be able to iterate with only two buffers.
+  const int MAX_PINNED_PAGES = pin_stream ? 1000 : 2;
+  Init(MAX_PINNED_PAGES * BUFFER_SIZE);
+
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  bool got_reservation;
+  ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+  int num_buffers_attached = 0;
+  /// Read over the page in different increments.
+  for (int append_batch_size : {1, 10, 100, 1000}) {
+    AppendToReadWriteStream(
+        append_batch_size, BUFFER_SIZE, &num_buffers_attached, &stream);
+  }
+
+  if (attach_on_read) {
+    EXPECT_EQ(stream.byte_size() / BUFFER_SIZE - 1, num_buffers_attached)
+        << "All buffers except the current write page should have been attached";
+  } else {
+    EXPECT_EQ(0, num_buffers_attached);
+  }
+
+  RowBatch* final_out_batch = pool_.Add(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
+  stream.Close(final_out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  final_out_batch->Reset();
+}
+
+void SimpleTupleStreamTest::AppendToReadWriteStream(int64_t append_batch_size,
+    int64_t buffer_size, int* num_buffers_attached, BufferedTupleStream* stream) {
+  RowBatch* in_batch = CreateIntBatch(0, BATCH_SIZE, false);
+
+  /// Accumulate row batches until we see a flush. The contents of the batches should
+  /// remain valid until reset or delete trailing batches.
+  vector<unique_ptr<RowBatch>> out_batches;
+  // The start row index of each batch in 'out_batches'.
+  vector<int64_t> out_batch_start_indices;
+  // Iterate over at least 10 pages.
+  int64_t start_byte_size = stream->byte_size();
+  while (stream->byte_size() - start_byte_size < 10 * buffer_size) {
+    Status status;
+    for (int i = 0; i < append_batch_size; ++i) {
+      bool ret = stream->AddRow(
+          in_batch->GetRow(stream->num_rows() % in_batch->num_rows()), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+    int64_t rows_read = 0;
+    bool eos;
+    while (rows_read < append_batch_size) {
+      out_batches.emplace_back(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
+      out_batch_start_indices.push_back(stream->rows_returned());
+      ASSERT_OK(stream->GetNext(out_batches.back().get(), &eos));
+      // Verify the contents of all valid batches to make sure that they haven't become
+      // invalid.
+      LOG(INFO) << "Verifying " << out_batches.size() << " batches";
+      for (int i = 0; i < out_batches.size(); ++i) {
+        VerifyReadWriteBatch(in_batch, out_batches[i].get(), out_batch_start_indices[i]);
+      }
+      *num_buffers_attached += out_batches.back()->num_buffers();
+      rows_read += out_batches.back()->num_rows();
+      EXPECT_EQ(rows_read == append_batch_size, eos);
+      if (out_batches.back().get()->flush_mode()
+          == RowBatch::FlushMode::FLUSH_RESOURCES) {
+        out_batches.clear();
+        out_batch_start_indices.clear();
+      }
+    }
+    EXPECT_EQ(append_batch_size, rows_read);
+    EXPECT_EQ(true, eos);
+  }
+  in_batch->Reset();
+}
+
+void SimpleTupleStreamTest::VerifyReadWriteBatch(
+    RowBatch* in_batch, RowBatch* out_batch, int64_t start_index) {
+  int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
+  int64_t row_index = start_index;
+  for (int i = 0; i < out_batch->num_rows(); ++i) {
+    TupleRow* in_row = in_batch->GetRow(row_index++ % in_batch->num_rows());
+    EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
+        *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
+  }
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedAttach) {
+  TestFlushResourcesReadWrite(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedNoAttach) {
+  TestFlushResourcesReadWrite(true, false);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedAttach) {
+  TestFlushResourcesReadWrite(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedNoAttach) {
+  TestFlushResourcesReadWrite(false, false);
+}
+
 // Test that tuple stream functions if it references strings outside stream. The
 // aggregation node relies on this since it updates tuples in-place.
 TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
@@ -805,11 +1018,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
 
   DCHECK_EQ(rows_added, stream.num_rows());
 
-  for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
+  for (int attach_on_read = 0; attach_on_read <= 1; ++attach_on_read) {
     // Keep stream in memory and test we can read ok.
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
@@ -934,8 +1147,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   vector<uint8_t> tuple_mem(tuple_desc->byte_size());
   Tuple* write_tuple = reinterpret_cast<Tuple*>(tuple_mem.data());
   write_row->SetTuple(0, write_tuple);
-  StringValue* write_str = reinterpret_cast<StringValue*>(
-      write_tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+  StringValue* write_str =
+      write_tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
   // Make the string large enough to fill a page.
   const int64_t string_len = BIG_ROW_BYTES - tuple_desc->byte_size();
   vector<char> data(string_len);
@@ -961,8 +1174,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_TRUE(eos);
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       EXPECT_EQ(i, str->ptr[j]) << j;
@@ -988,8 +1200,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_EQ(eos, i == MAX_BUFFERS) << i;
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       ASSERT_EQ(i, str->ptr[j]) << j;
@@ -1117,10 +1328,10 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   }
 
   for (int i = 0; i < 3; ++i) {
-    bool delete_on_read = i == 2;
+    bool attach_on_read = i == 2;
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);

http://git-wip-us.apache.org/repos/asf/impala/blob/4af3a785/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 9326507..71175d1 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -47,36 +47,20 @@ using namespace impala;
 using namespace strings;
 
 using BufferHandle = BufferPool::BufferHandle;
+using FlushMode = RowBatch::FlushMode;
 
 BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
-    node_id_(-1),
     buffer_pool_(state->exec_env()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
-    num_pages_(0),
-    total_byte_size_(0),
-    has_read_iterator_(false),
     read_page_reservation_(buffer_pool_client_),
-    read_page_rows_returned_(-1),
-    read_ptr_(nullptr),
-    read_end_ptr_(nullptr),
-    write_ptr_(nullptr),
-    write_end_ptr_(nullptr),
-    rows_returned_(0),
-    has_write_iterator_(false),
-    write_page_(nullptr),
     write_page_reservation_(buffer_pool_client_),
-    bytes_pinned_(0),
-    num_rows_(0),
     default_page_len_(default_page_len),
     max_page_len_(max_page_len),
-    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
-    delete_on_read_(false),
-    closed_(false),
-    pinned_(true) {
+    has_nullable_tuple_(row_desc->IsAnyTupleNullable()) {
   DCHECK_GE(max_page_len, default_page_len);
   DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
   DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
@@ -110,10 +94,6 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   }
 }
 
-BufferedTupleStream::~BufferedTupleStream() {
-  DCHECK(closed_);
-}
-
 void BufferedTupleStream::CheckConsistencyFull() const {
   CheckConsistencyFast();
   // The below checks require iterating over all the pages in the stream.
@@ -139,11 +119,13 @@ void BufferedTupleStream::CheckConsistencyFast() const {
   DCHECK(has_read_iterator() || read_page_ == pages_.end());
   if (read_page_ != pages_.end()) {
     CheckPageConsistency(&*read_page_);
-    DCHECK(read_page_->is_pinned());
-    DCHECK(read_page_->retrieved_buffer);
-    // Can't check read buffer without affecting behaviour, because a read may be in
-    // flight and this would required blocking on that write.
-    DCHECK_GE(read_end_ptr_, read_ptr_);
+    if (!read_page_->attached_to_output_batch) {
+      DCHECK(read_page_->is_pinned());
+      DCHECK(read_page_->retrieved_buffer);
+      // Can't check read buffer without affecting behaviour, because a read may be in
+      // flight and this would required blocking on that write.
+      DCHECK_GE(read_end_ptr_, read_ptr_);
+    }
   }
   if (NeedReadReservation()) {
     DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
@@ -159,6 +141,12 @@ void BufferedTupleStream::CheckConsistencyFast() const {
 }
 
 void BufferedTupleStream::CheckPageConsistency(const Page* page) const {
+  if (page->attached_to_output_batch) {
+    /// Read page was just attached to output batch.
+    DCHECK(is_read_page(page)) << page->DebugString();
+    DCHECK(!page->handle.is_open());
+    return;
+  }
   DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString();
   // Only one large row per page.
   if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
@@ -170,7 +158,7 @@ string BufferedTupleStream::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStream num_rows=" << num_rows_
      << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
+     << " attach_on_read=" << attach_on_read_ << " closed=" << closed_ << "\n"
      << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
      << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
      << " read_page=";
@@ -201,8 +189,23 @@ string BufferedTupleStream::DebugString() const {
   return ss.str();
 }
 
+void BufferedTupleStream::Page::AttachBufferToBatch(
+    BufferedTupleStream* parent, RowBatch* batch, FlushMode flush) {
+  DCHECK(is_pinned());
+  DCHECK(retrieved_buffer);
+  parent->bytes_pinned_ -= len();
+  // ExtractBuffer() cannot fail because the buffer is already in memory.
+  BufferPool::BufferHandle buffer;
+  Status status =
+      parent->buffer_pool_->ExtractBuffer(parent->buffer_pool_client_, &handle, &buffer);
+  DCHECK(status.ok());
+  batch->AddBuffer(parent->buffer_pool_client_, move(buffer), flush);
+  attached_to_output_batch = true;
+}
+
 string BufferedTupleStream::Page::DebugString() const {
-  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
+  return Substitute("$0 num_rows=$1 retrived_buffer=$2 attached_to_output_batch=$3",
+      handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
 }
 
 Status BufferedTupleStream::Init(int node_id, bool pinned) {
@@ -214,7 +217,7 @@ Status BufferedTupleStream::Init(int node_id, bool pinned) {
 Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -229,10 +232,10 @@ Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
 }
 
 Status BufferedTupleStream::PrepareForReadWrite(
-    bool delete_on_read, bool* got_reservation) {
+    bool attach_on_read, bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -243,20 +246,17 @@ Status BufferedTupleStream::PrepareForReadWrite(
   // Save reservation for both the read and write iterators.
   buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
+  RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read));
   return Status::OK();
 }
 
-void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
+void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
   for (Page& page : pages_) {
+    if (page.attached_to_output_batch) continue; // Already returned.
     if (batch != nullptr && page.retrieved_buffer) {
       // Subtle: We only need to attach buffers from pages that we may have returned
-      // references to. ExtractBuffer() cannot fail for these pages because the data
-      // is guaranteed to already be in -memory.
-      BufferPool::BufferHandle buffer;
-      Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
-      DCHECK(status.ok());
-      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
+      // references to.
+      page.AttachBufferToBatch(this, batch, flush);
     } else {
       buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
     }
@@ -271,7 +271,9 @@ void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
 
 int64_t BufferedTupleStream::CalcBytesPinned() const {
   int64_t result = 0;
-  for (const Page& page : pages_) result += page.pin_count() * page.len();
+  for (const Page& page : pages_) {
+    if (!page.attached_to_output_batch) result += page.pin_count() * page.len();
+  }
   return result;
 }
 
@@ -282,6 +284,7 @@ Status BufferedTupleStream::PinPage(Page* page) {
 }
 
 int BufferedTupleStream::ExpectedPinCount(bool stream_pinned, const Page* page) const {
+  DCHECK(!page->attached_to_output_batch);
   return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
 }
 
@@ -483,12 +486,11 @@ Status BufferedTupleStream::NextReadPage() {
         && !NeedReadReservation(pinned_, num_pages_, true, true)) {
       buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
     }
-  } else if (delete_on_read_) {
+  } else if (attach_on_read_) {
     DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
                                          << DebugString();
     DCHECK_NE(&*read_page_, write_page_);
-    bytes_pinned_ -= pages_.front().len();
-    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
+    DCHECK(read_page_->attached_to_output_batch);
     pages_.pop_front();
     --num_pages_;
     read_page_ = pages_.begin();
@@ -557,12 +559,13 @@ void BufferedTupleStream::InvalidateReadIterator() {
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
-  // It is safe to re-read a delete-on-read stream if no rows were read and no pages
+  // It is safe to re-read an attach-on-read stream if no rows were read and no pages
   // were therefore deleted.
-  if (rows_returned_ == 0) delete_on_read_ = false;
+  DCHECK(attach_on_read_ == false || rows_returned_ == 0);
+  if (rows_returned_ == 0) attach_on_read_ = false;
 }
 
-Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reservation) {
+Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reservation) {
   CHECK_CONSISTENCY_FULL();
   InvalidateWriteIterator();
   InvalidateReadIterator();
@@ -570,12 +573,12 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reserv
   *got_reservation = pinned_ || pages_.empty()
       || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
-  return PrepareForReadInternal(delete_on_read);
+  return PrepareForReadInternal(attach_on_read);
 }
 
-Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
+Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
   DCHECK(!closed_);
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_read_iterator());
 
   has_read_iterator_ = true;
@@ -599,7 +602,7 @@ Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
   }
   read_page_rows_returned_ = 0;
   rows_returned_ = 0;
-  delete_on_read_ = delete_on_read;
+  attach_on_read_ = attach_on_read;
   CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
@@ -708,6 +711,15 @@ Status BufferedTupleStream::GetNextInternal(
 
   if (UNLIKELY(read_page_ == pages_.end()
           || read_page_rows_returned_ == read_page_->num_rows)) {
+    if (read_page_ != pages_.end() && attach_on_read_
+        && !read_page_->attached_to_output_batch) {
+      DCHECK(has_write_iterator());
+      // We're in a read-write stream in the case where we're at the end of the read page
+      // but the buffer was not attached on the last GetNext() call because the write
+      // iterator had not yet advanced.
+      read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      return Status::OK();
+    }
     // Get the next page in the stream (or the first page if read_page_ was not yet
     // initialized.) We need to do this at the beginning of the GetNext() call to ensure
     // the buffer management semantics. NextReadPage() may unpin or delete the buffer
@@ -729,7 +741,7 @@ Status BufferedTupleStream::GetNextInternal(
   // null tuple indicator.
   if (FILL_FLAT_ROWS) {
     DCHECK(flat_rows != nullptr);
-    DCHECK(!delete_on_read_);
+    DCHECK(!attach_on_read_);
     DCHECK_EQ(batch->num_rows(), 0);
     flat_rows->clear();
     flat_rows->reserve(rows_to_fill);
@@ -768,11 +780,28 @@ Status BufferedTupleStream::GetNextInternal(
   rows_returned_ += rows_to_fill;
   read_page_rows_returned_ += rows_to_fill;
   *eos = (rows_returned_ == num_rows_);
-  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
-    // No more data in this page. The batch must be immediately returned up the operator
-    // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
-    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
-    batch->MarkNeedsDeepCopy();
+  if (read_page_rows_returned_ == read_page_->num_rows) {
+    // No more data in this page. NextReadPage() may need to reuse the reservation
+    // currently used for 'read_page_' so we may need to flush resources. When
+    // 'attach_on_read_' is true, we're returning the buffer. Otherwise the buffer will
+    // be unpinned later but we're returning a reference to the memory so we need to
+    // signal to the caller that the resources are going away. Note that if there is a
+    // read-write page it is not safe to attach the buffer yet because more rows may be
+    // appended to the page.
+    if (attach_on_read_) {
+      if (!has_read_write_page()) {
+        // Safe to attach because we already called GetBuffer() in NextReadPage().
+        // TODO: always flushing for pinned streams is overkill since we may not need
+        // to reuse the reservation immediately. Changing this may require modifying
+        // callers of this class.
+        read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      }
+    } else if (!pinned_) {
+      // Flush resources so that we can safely unpin the page on the next GetNext() call.
+      // Note that if this is a read/write page we might not actually do the advance on
+      // the next call to GetNext(). In that case the flush is still safe to do.
+      batch->MarkFlushResources();
+    }
   }
   if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
   DCHECK_LE(read_ptr_, read_end_ptr_);
@@ -1028,7 +1057,7 @@ void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const
   DCHECK(row != nullptr);
   DCHECK(!closed_);
   DCHECK(is_pinned());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   uint8_t* data = flat_row;
   return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
                                UnflattenTupleRow<false>(&data, row);

http://git-wip-us.apache.org/repos/asf/impala/blob/4af3a785/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 565b5fa..4090ea8 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -93,12 +93,9 @@ class TupleRow;
 /// buffer is needed to keep the row being processed in-memory, but only default-sized
 /// buffers are needed for the other streams being written.
 ///
-/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
-/// to PrepareForRead() which deletes the stream's pages as it does a final read
-/// pass over the stream.
-///
-/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
-/// buffers to RowBatches.
+/// The tuple stream also supports a 'attach_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which attaches the stream's pages to the output batch as it
+/// does a final destructive read pass over the stream.
 ///
 /// Page layout:
 /// Rows are stored back to back starting at the first byte of each page's buffer, with
@@ -162,11 +159,11 @@ class TupleRow;
 /// Read:
 ///   1. Unpinned: Only a single read page is pinned at a time. This means that only
 ///     enough reservation to pin a single page is needed to read the stream, regardless
-///     of the stream's size. Each page is deleted or unpinned (if delete on read is true
+///     of the stream's size. Each page is attached or unpinned (if attach on read is true
 ///     or false respectively) before advancing to the next page.
 ///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
-///     unpinned when reading from the stream. If delete on read is true, pages are
-///     deleted after being read. If the stream was previously unpinned, the page's data
+///     unpinned when reading from the stream. If attach on read is true, pages are
+///     attached after being read. If the stream was previously unpinned, the page's data
 ///     may not yet be in memory - reading from the stream can block on I/O or fail with
 ///     an I/O error.
 /// Write:
@@ -179,12 +176,17 @@ class TupleRow;
 ///     or free up other memory before retrying.
 ///
 /// Memory lifetime of rows read from stream:
-/// If the stream is pinned and delete on read is false, it is valid to access any tuples
-/// returned via GetNext() until the stream is unpinned. If the stream is unpinned or
-/// delete on read is true, then the batch returned from GetNext() may have the
-/// needs_deep_copy flag set, which means that any tuple memory returned so far from the
-/// stream may be freed on the next call to GetNext().
-/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
+/// There are several cases.
+/// 1. If the stream is pinned and attach on read is false, it is valid to access any
+///    tuples returned via GetNext() until the stream is unpinned.
+/// 2. If the stream is in attach on read mode, all buffers referenced by returned rows
+///    are attached to the batches by that GetNext() call or a subsequent one. The
+///    caller is responsible for managing the lifetime of those buffers.
+/// 3. If the stream is unpinned and not in attach on read mode, then the batch returned
+///    from GetNext() may have the FLUSH_RESOURCES flag set, which means that any tuple
+///    memory returned so far from the stream may be freed on the next call to GetNext().
+///    It is *not* safe to return references to rows returned in this mode outside of
+///    the ExecNode.
 ///
 /// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
 /// The BufferedTupleStream supports allocation of uninitialized rows with
@@ -197,8 +199,7 @@ class TupleRow;
 /// will not be modified until the stream is read via GetNext().
 /// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
-/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
-/// page will need to be pinned soon.
+/// TODO: prefetching for pages could speed up iteration over unpinned streams.
 class BufferedTupleStream {
  public:
   /// A pointer to the start of a flattened TupleRow in the stream.
@@ -213,7 +214,7 @@ class BufferedTupleStream {
       int64_t max_page_len,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
 
-  virtual ~BufferedTupleStream();
+  ~BufferedTupleStream() { DCHECK(closed_); }
 
   /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
   /// once before any of the other APIs.
@@ -233,23 +234,23 @@ class BufferedTupleStream {
   /// Prepares the stream for interleaved reads and writes by saving enough reservation
   /// for default-sized read and write pages. Called after Init() and before the first
   /// AddRow() or AddRowCustomBegin() call.
-  /// 'delete_on_read': Pages are deleted after they are read.
+  /// 'attach_on_read': Pages are attached to the output batch after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     read and write pages and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
   Status PrepareForReadWrite(
-      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+      bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if there is one).
   /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
   /// over the stream, unless rows were read from the stream after PrepareForRead() or
-  /// PrepareForReadWrite() was called with delete_on_read = true.
-  /// 'delete_on_read': Pages are deleted after they are read.
+  /// PrepareForReadWrite() was called with attach_on_read = true.
+  /// 'attach_on_read': Pages are attached to the output batch after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first read page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+  Status PrepareForRead(bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Adds a single row to the stream. There are three possible outcomes:
   /// a) The append succeeds. True is returned.
@@ -303,7 +304,10 @@ class BufferedTupleStream {
   enum UnpinMode {
     /// All pages in the stream are unpinned and the read/write positions in the stream
     /// are reset. No more rows can be written to the stream after this. The stream can
-    /// be re-read from the beginning by calling PrepareForRead().
+    /// be re-read from the beginning by calling PrepareForRead(). It in invalid to call
+    /// UnpinStream(UNPIN_ALL) if the stream is in 'attach_on_read' mode and >= 1 row has
+    /// been read from the stream, because this would leave the stream in limbo where it
+    /// still has unpinned pages but it cannot be read or written to.
     UNPIN_ALL,
     /// All pages are unpinned aside from the current read and write pages (if any),
     /// which is left in the same state. The unpinned stream can continue being read
@@ -315,14 +319,21 @@ class BufferedTupleStream {
   void UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
-  /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
-  /// flag may be set on 'batch' to signal that memory will be freed on the next
-  /// call to GetNext() and that the caller should copy out any data it needs from
-  /// rows in 'batch' or in previous batches returned from GetNext().
   ///
-  /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
+  /// If the stream is in 'attach_on_read' mode then buffers are attached to 'batch'
+  /// when the last row referencing the buffer is returned. The FLUSH_RESOURCES flag
+  /// is always set when attaching such a buffer.
+  /// TODO: always flushing for pinned streams is overkill since we may not need
+  /// to reuse the reservation immediately. Changing this may require modifying
+  /// callers of this class.
+  ///
+  /// If the stream is unpinned and not in 'attach_on_read' mode, the FLUSH_RESOURCES
+  /// flag may be set on the batch to signal that memory will be freed on the next call
+  /// to GetNext() and that the caller should copy out any data it needs from rows in
+  /// 'batch' or in previous batches returned from GetNext().
+  ///
+  /// If the stream is pinned and 'attach_on_read' is false, the memory backing the
   /// rows will remain valid until the stream is unpinned, destroyed, etc.
-  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
   Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
 
   /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
@@ -370,8 +381,6 @@ class BufferedTupleStream {
 
   /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
   struct Page {
-    Page() : num_rows(0), retrieved_buffer(true) {}
-
     inline int len() const { return handle.len(); }
     inline bool is_pinned() const { return handle.is_pinned(); }
     inline int pin_count() const { return handle.pin_count(); }
@@ -380,17 +389,27 @@ class BufferedTupleStream {
       retrieved_buffer = true;
       return Status::OK();
     }
+
+    /// Attach the buffer from this page to 'batch'. Only valid to call if the page is
+    /// pinned and 'retrieved_buffer' is true. Decrements parent->bytes_pinned_.
+    void AttachBufferToBatch(
+        BufferedTupleStream* parent, RowBatch* batch, RowBatch::FlushMode flush);
+
     std::string DebugString() const;
 
     BufferPool::PageHandle handle;
 
     /// Number of rows written to the page.
-    int num_rows;
+    int num_rows = 0;
 
     /// Whether we called GetBuffer() on the page since it was last pinned. This means
     /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
     /// returned rows referencing the page's buffer.
-    bool retrieved_buffer;
+    bool retrieved_buffer = true;
+
+    /// If the page was just attached to the output batch on the last GetNext() call while
+    /// in attach_on_read mode. If true, then 'handle' is closed.
+    bool attached_to_output_batch = false;
   };
 
   /// Runtime state instance used to check for cancellation. Not owned.
@@ -400,7 +419,7 @@ class BufferedTupleStream {
   const RowDescriptor* desc_;
 
   /// Plan node ID, used for error reporting.
-  int node_id_;
+  int node_id_ = -1;
 
   /// The size of the fixed length portion for each tuple in the row.
   std::vector<int> fixed_tuple_sizes_;
@@ -420,18 +439,18 @@ class BufferedTupleStream {
   /// List of pages in the stream.
   /// Empty iff one of two cases applies:
   /// * before the first row has been added with AddRow() or AddRowCustom().
-  /// * after the stream has been destructively read in 'delete_on_read' mode
+  /// * after the stream has been destructively read in 'attach_on_read' mode
   std::list<Page> pages_;
   // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages.
   // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561.
-  int64_t num_pages_;
+  int64_t num_pages_ = 0;
 
-  /// Total size of pages_, including any pages already deleted in 'delete_on_read'
+  /// Total size of pages_, including any pages already deleted in 'attach_on_read'
   /// mode.
-  int64_t total_byte_size_;
+  int64_t total_byte_size_ = 0;
 
   /// True if there is currently an active read iterator for the stream.
-  bool has_read_iterator_;
+  bool has_read_iterator_ = false;
 
   /// The current page being read. When no read iterator is active, equal to list.end().
   /// When a read iterator is active, either points to the current read page, or equals
@@ -447,31 +466,31 @@ class BufferedTupleStream {
   BufferPool::SubReservation read_page_reservation_;
 
   /// Number of rows returned from the current read_page_.
-  uint32_t read_page_rows_returned_;
+  uint32_t read_page_rows_returned_ = -1;
 
   /// Pointer into read_page_ to the byte after the last row read.
-  uint8_t* read_ptr_;
+  uint8_t* read_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of read_page_. Used to detect overruns.
-  const uint8_t* read_end_ptr_;
+  const uint8_t* read_end_ptr_ = nullptr;
 
   /// Pointer into write_page_ to the byte after the last row written.
-  uint8_t* write_ptr_;
+  uint8_t* write_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of write_page_. Cached to speed up computation
-  const uint8_t* write_end_ptr_;
+  const uint8_t* write_end_ptr_ = nullptr;
 
   /// Number of rows returned to the caller from GetNext() since the last
   /// PrepareForRead() call.
-  int64_t rows_returned_;
+  int64_t rows_returned_ = 0;
 
   /// True if there is currently an active write iterator into the stream.
-  bool has_write_iterator_;
+  bool has_write_iterator_ = false;
 
   /// The current page for writing. NULL if there is no write iterator or no current
   /// write page. Always pinned. Size is 'default_page_len_', except temporarily while
   /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
-  Page* write_page_;
+  Page* write_page_ = nullptr;
 
   /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if
   /// there is a write iterator, no page currently pinned for writing and the possibility
@@ -484,11 +503,11 @@ class BufferedTupleStream {
 
   /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
   /// to compute it.
-  int64_t bytes_pinned_;
+  int64_t bytes_pinned_ = 0;
 
   /// Number of rows stored in the stream. Includes rows that were already deleted during
-  /// a destructive 'delete_on_read' pass over the stream.
-  int64_t num_rows_;
+  /// a destructive 'attach_on_read' pass over the stream.
+  int64_t num_rows_ = 0;
 
   /// The default length in bytes of pages used to store the stream's rows. All rows that
   /// fit in a default-sized page are stored in default-sized page.
@@ -503,14 +522,14 @@ class BufferedTupleStream {
   const bool has_nullable_tuple_;
 
   /// If true, pages are deleted after they are read during this read pass. Once rows
-  /// have been read from a stream with 'delete_on_read_' true, this is always true.
-  bool delete_on_read_;
+  /// have been read from a stream with 'attach_on_read_' true, this is always true.
+  bool attach_on_read_ = false;
 
-  bool closed_; // Used for debugging.
+  bool closed_ = false; // Used for debugging.
 
   /// If true, this stream has been explicitly pinned by the caller and all pages are
   /// kept pinned until the caller calls UnpinStream().
-  bool pinned_;
+  bool pinned_ = true;
 
   bool is_read_page(const Page* page) const {
     return read_page_ != pages_.end() && &*read_page_ == page;
@@ -585,7 +604,7 @@ class BufferedTupleStream {
 
   /// Same as PrepareForRead(), except the iterators are not invalidated and
   /// the caller is assumed to have checked there is sufficient unused reservation.
-  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
+  Status PrepareForReadInternal(bool attach_on_read) WARN_UNUSED_RESULT;
 
   /// Pins the next read page. This blocks reading from disk if necessary to bring the
   /// page's data into memory. Updates read_page_, read_ptr_, and
@@ -593,7 +612,9 @@ class BufferedTupleStream {
   Status NextReadPage() WARN_UNUSED_RESULT;
 
   /// Invalidate the read iterator, and release any resources associated with the active
-  /// iterator.
+  /// iterator. Invalid to call if 'attach_on_read_' is true and >= 1 rows have been read,
+  /// because that would leave the stream in limbo where it still has pages but it is
+  /// invalid to read or write from in future.
   void InvalidateReadIterator();
 
   /// Returns the total additional bytes that this row will consume in write_page_ if
@@ -618,7 +639,8 @@ class BufferedTupleStream {
   void UnpinPageIfNeeded(Page* page, bool stream_pinned);
 
   /// Return the expected pin count for 'page' in the current stream based on the current
-  /// read and write pages and whether the stream is pinned.
+  /// read and write pages and whether the stream is pinned. Not valid to call if
+  /// the page was just deleted, i.e. page->attached_to_output_batch == false.
   int ExpectedPinCount(bool stream_pinned, const Page* page) const;
 
   /// Return true if the stream in its current state needs to have a reservation for

http://git-wip-us.apache.org/repos/asf/impala/blob/4af3a785/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 67adb9b..90d8c4d 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -418,6 +418,7 @@ class RowBatch {
   friend class RowBatchSerializeBaseline;
   friend class RowBatchSerializeBenchmark;
   friend class RowBatchSerializeTest;
+  friend class SimpleTupleStreamTest;
 
   /// Creates an empty row batch based on the serialized row batch header. Called from
   /// FromProtobuf() above before desrialization of a protobuf row batch.

http://git-wip-us.apache.org/repos/asf/impala/blob/4af3a785/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 68f313d..91517f1 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -247,6 +247,10 @@ class Tuple {
     return static_cast<bool*>(GetSlot(offset));
   }
 
+  int32_t* GetIntSlot(int offset) {
+    return static_cast<int32_t*>(GetSlot(offset));
+  }
+
   int64_t* GetBigIntSlot(int offset) {
     return static_cast<int64_t*>(GetSlot(offset));
   }