You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/12/15 23:00:45 UTC

[19/50] [abbrv] incubator-impala git commit: IMPALA-4586: don't constant fold in backend

IMPALA-4586: don't constant fold in backend

This patch ensures that setting the query option
enable_expr_rewrites=false will disable both constant folding in the
frontend (which it did already) and constant caching in the backend
(which is enabled in this patch). This gives a way for users to revert
to the old behaviour of non-deterministic UDFs before these
optimisations were added in Impala 2.8.

Before this patch, the backend would cache values based on IsConstant().
This meant that there was no way to override caching of values of
non-deterministic UDFs, e.g. with enable_expr_rewrites.

After this patch, we only cache literal values in the backend. This
offers the same performance as before in the common case where the
frontend will constant fold the expressions anyway.

Also rename some functions to more cleanly separate the backend concepts
of "constant" expressions and expressions that can be evaluated without
a TupleRow. In a future change (IMPALA-4617) we should remove the
IsConstant() analysis logic from the backend entirely and pass the
information from the frontend. We should also fix isConstant() in the
frontend so that it only returns true when it is safe to constant-fold
the expression (IMPALA-4606). Once that is done, we could revert back
to using IsConstant() instead of IsLiteral().

Testing:
Added targeted test to test constant folding of UDFs: we expect
different results depending on whether constant folding is enabled.

Also run TestUdfs with expr rewrites enabled and disabled, since this
can exercise different code paths. Refactored test_udfs somewhat to
avoid running uninteresting combinations of query options for
targeted tests and removed some 'drop * if not exists' statements
that aren't necessary when using unique_database.

This change revealed flakiness in test_mem_limit, which seems
to have only worked by coincidence. Updated TrackAllocation() to
actually set the query status when a memory limit is exceeded.
Looped this test for a while to make sure it isn't flaky any
more.

Also fix other test bugs where the vector argument is modified
in-place, which can leak out to other tests.

Change-Id: I0c76e3c8a8d92749256c312080ecd7aac5d99ce7
Reviewed-on: http://gerrit.cloudera.org:8080/5391
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/88448d1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/88448d1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/88448d1d

Branch: refs/heads/hadoop-next
Commit: 88448d1d4ab31eaaf82f764b36dc7d11d4c63c32
Parents: 5ea1798
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Dec 6 13:30:47 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Dec 8 04:53:53 2016 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-context.cc                    |   4 +-
 be/src/exprs/expr-context.h                     |   9 +-
 be/src/exprs/expr.cc                            |   4 +
 be/src/exprs/expr.h                             |  16 +-
 be/src/exprs/literal.cc                         |   4 +
 be/src/exprs/literal.h                          |   2 +
 be/src/exprs/null-literal.cc                    |   4 +
 be/src/exprs/null-literal.h                     |   5 +-
 be/src/exprs/scalar-fn-call.cc                  |  20 +-
 be/src/exprs/slot-ref.cc                        |   2 +-
 be/src/service/fe-support.cc                    |   8 +-
 be/src/udf/udf-internal.h                       |   5 +
 be/src/udf/udf.cc                               |  10 +-
 common/thrift/ImpalaInternalService.thrift      |   2 +-
 .../apache/impala/analysis/AnalyticExpr.java    |   6 +-
 .../apache/impala/analysis/AnalyticWindow.java  |   6 +-
 .../apache/impala/analysis/LimitElement.java    |   2 +-
 .../org/apache/impala/analysis/LiteralExpr.java |   2 +-
 .../org/apache/impala/service/FeSupport.java    |  32 +-
 .../QueryTest/udf-init-close-deterministic.test |  32 +
 .../queries/QueryTest/udf-init-close.test       |  30 -
 .../QueryTest/udf-non-deterministic.test        |  17 +
 .../functional-query/queries/QueryTest/udf.test |  18 +-
 tests/common/test_dimensions.py                 |  22 +-
 tests/query_test/test_udfs.py                   | 603 ++++++++++---------
 25 files changed, 465 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/expr-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc
index 94c652a..805e110 100644
--- a/be/src/exprs/expr-context.cc
+++ b/be/src/exprs/expr-context.cc
@@ -150,8 +150,8 @@ void ExprContext::FreeLocalAllocations(const vector<FunctionContext*>& fn_ctxs)
   }
 }
 
-void ExprContext::GetConstantValue(TColumnValue* col_val) {
-  DCHECK(root_->IsConstant());
+void ExprContext::EvaluateWithoutRow(TColumnValue* col_val) {
+  DCHECK_EQ(0, root_->GetSlotIds());
   void* value = GetValue(NULL);
   if (value == NULL) return;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index f7e1239..6903105 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -77,9 +77,10 @@ class ExprContext {
   /// result in result_.
   void* GetValue(const TupleRow* row);
 
-  /// Convenience function for evaluating constant Exprs from the FE. Extracts value into
-  /// col_val and sets the appropriate __isset flag. No fields are set for NULL values.
-  /// The specific field in col_val that receives the value is based on the expr type:
+  /// Convenience function for evaluating Exprs that don't reference slots from the FE.
+  /// Extracts value into 'col_val' and sets the appropriate __isset flag. No fields are
+  /// set for NULL values. The specific field in 'col_val' that receives the value is
+  /// based on the expr type:
   /// TYPE_BOOLEAN: boolVal
   /// TYPE_TINYINT/SMALLINT/INT: intVal
   /// TYPE_BIGINT: longVal
@@ -89,7 +90,7 @@ class ExprContext {
   ///              above 127. Pass the raw bytes so the caller can decide what to
   ///              do with the result (e.g., bail constant folding).
   /// TYPE_TIMESTAMP: binaryVal has the raw data, stringVal its string representation.
-  void GetConstantValue(TColumnValue* col_val);
+  void EvaluateWithoutRow(TColumnValue* col_val);
 
   /// Convenience functions: print value into 'str' or 'stream'.  NULL turns into "NULL".
   void PrintValue(const TupleRow* row, std::string* str);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.cc b/be/src/exprs/expr.cc
index 3f2ef7b..e0f9516 100644
--- a/be/src/exprs/expr.cc
+++ b/be/src/exprs/expr.cc
@@ -453,6 +453,10 @@ bool Expr::IsConstant() const {
   return true;
 }
 
+bool Expr::IsLiteral() const {
+  return false;
+}
+
 int Expr::GetSlotIds(vector<SlotId>* slot_ids) const {
   int n = 0;
   for (int i = 0; i < children_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 942eec0..1eee396 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -165,14 +165,20 @@ class Expr {
   /// expr has an error set.
   Status GetFnContextError(ExprContext* ctx);
 
-  /// Returns true if GetValue(NULL) can be called on this expr and always returns the same
-  /// result (e.g., exprs that don't contain slotrefs). The default implementation returns
+  /// Returns true if the expression is considered constant. This must match the
+  /// definition of Expr.isConstant() in the frontend. The default implementation returns
   /// true if all children are constant.
+  /// TODO: IMPALA-4617 - plumb through the value from the frontend and remove duplicate
+  /// logic.
   virtual bool IsConstant() const;
 
-  /// Returns the slots that are referenced by this expr tree in 'slot_ids'.
-  /// Returns the number of slots added to the vector
-  virtual int GetSlotIds(std::vector<SlotId>* slot_ids) const;
+  /// Returns true if this is a literal expression.
+  virtual bool IsLiteral() const;
+
+  /// Returns the number of SlotRef nodes in the expr tree. If this returns 0, it means it
+  /// is valid to call GetValue(nullptr) on the expr tree.
+  /// If 'slot_ids' is non-null, add the slot ids to it.
+  virtual int GetSlotIds(std::vector<SlotId>* slot_ids = nullptr) const;
 
   /// Returns true iff the expression 'texpr' contains UDF available only as LLVM IR. In
   /// which case, it's impossible to interpret this expression and codegen must be used.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/literal.cc b/be/src/exprs/literal.cc
index 4fd4e2c..0445f64 100644
--- a/be/src/exprs/literal.cc
+++ b/be/src/exprs/literal.cc
@@ -214,6 +214,10 @@ Literal::Literal(ColumnType type, const TimestampValue& v)
   value_.timestamp_val = v;
 }
 
+bool Literal::IsLiteral() const {
+  return true;
+}
+
 template<class T>
 bool ParseString(const string& str, T* val) {
   istringstream stream(str);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/literal.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index b6d1a49..644e3b0 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -46,6 +46,8 @@ class Literal: public Expr {
   /// Literal.
   static Literal* CreateLiteral(const ColumnType& type, const std::string& str);
 
+  virtual bool IsLiteral() const;
+
   virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn);
 
   virtual impala_udf::BooleanVal GetBooleanVal(ExprContext*, const TupleRow*);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/null-literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/null-literal.cc b/be/src/exprs/null-literal.cc
index 6985c5b..7b0e77c 100644
--- a/be/src/exprs/null-literal.cc
+++ b/be/src/exprs/null-literal.cc
@@ -85,6 +85,10 @@ CollectionVal NullLiteral::GetCollectionVal(ExprContext* context, const TupleRow
   return CollectionVal::null();
 }
 
+bool NullLiteral::IsLiteral() const {
+  return true;
+}
+
 // Generated IR for a bigint NULL literal:
 //
 // define { i8, i64 } @NullLiteral(i8* %context, %"class.impala::TupleRow"* %row) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/null-literal.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/null-literal.h b/be/src/exprs/null-literal.h
index 0a0f9a1..d908baa 100644
--- a/be/src/exprs/null-literal.h
+++ b/be/src/exprs/null-literal.h
@@ -28,6 +28,9 @@ class TExprNode;
 class NullLiteral: public Expr {
  public:
   NullLiteral(PrimitiveType type) : Expr(type) { }
+
+  virtual bool IsLiteral() const;
+
   virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn);
 
   virtual impala_udf::BooleanVal GetBooleanVal(ExprContext*, const TupleRow*);
@@ -46,7 +49,7 @@ class NullLiteral: public Expr {
 
  protected:
   friend class Expr;
-  
+
   NullLiteral(const TExprNode& node) : Expr(node) { }
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index 4cbd4d8..44f6ecc 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -211,11 +211,23 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx,
         input_arg = reinterpret_cast<AnyVal*>(varargs_buffer);
         varargs_buffer += arg_bytes;
       }
-      const AnyVal* constant_arg = fn_ctx->impl()->constant_args()[i];
-      if (constant_arg == NULL) {
-        non_constant_args.emplace_back(children_[i], input_arg);
-      } else {
+      // IMPALA-4586: Cache constant arguments only if the frontend has rewritten them
+      // into literal expressions. This gives the frontend control over how expressions
+      // are evaluated. This means that setting enable_expr_rewrites=false will also
+      // disable caching of non-literal constant expressions, which gives the old
+      // behaviour (before this caching optimisation was added) of repeatedly evaluating
+      // exprs that are constant according to IsConstant(). For exprs that are not truly
+      // constant (yet IsConstant() returns true for) e.g. non-deterministic UDFs, this
+      // means that setting enable_expr_rewrites=false works as a safety valve to get
+      // back the old behaviour, before constant expr folding or caching was added.
+      // TODO: once we can annotate UDFs as non-deterministic (IMPALA-4606), we should
+      // be able to trust IsConstant() and switch back to that.
+      if (children_[i]->IsLiteral()) {
+        const AnyVal* constant_arg = fn_ctx->impl()->constant_args()[i];
+        DCHECK(constant_arg != NULL);
         memcpy(input_arg, constant_arg, arg_bytes);
+      } else {
+        non_constant_args.emplace_back(children_[i], input_arg);
       }
     }
     fn_ctx->impl()->SetNonConstantArgs(move(non_constant_args));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/exprs/slot-ref.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index 3a70b6c..22c22b6 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -101,7 +101,7 @@ Status SlotRef::Prepare(RuntimeState* state, const RowDescriptor& row_desc,
 }
 
 int SlotRef::GetSlotIds(vector<SlotId>* slot_ids) const {
-  slot_ids->push_back(slot_id_);
+  if (slot_ids != nullptr) slot_ids->push_back(slot_id_);
   return 1;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index b4e2906..9415a76 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -76,7 +76,7 @@ Java_org_apache_impala_service_FeSupport_NativeFeTestInit(
 // the expr evaluation.
 extern "C"
 JNIEXPORT jbyteArray JNICALL
-Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
+Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
     JNIEnv* env, jclass caller_class, jbyteArray thrift_expr_batch,
     jbyteArray thrift_query_ctx_bytes) {
   Status status;
@@ -144,7 +144,7 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
     if (!status.ok()) goto error;
 
     TColumnValue val;
-    expr_ctx->GetConstantValue(&val);
+    expr_ctx->EvaluateWithoutRow(&val);
     status = expr_ctx->root()->GetFnContextError(expr_ctx);
     if (!status.ok()) goto error;
 
@@ -356,8 +356,8 @@ static JNINativeMethod native_methods[] = {
     (void*)::Java_org_apache_impala_service_FeSupport_NativeFeTestInit
   },
   {
-    (char*)"NativeEvalConstExprs", (char*)"([B[B)[B",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs
+    (char*)"NativeEvalExprsWithoutRow", (char*)"([B[B)[B",
+    (void*)::Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow
   },
   {
     (char*)"NativeCacheJar", (char*)"([B)[B",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 2fd7415..bf3032e 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -144,6 +144,11 @@ class FunctionContextImpl {
   /// Return false if 'buf' is null; returns true otherwise.
   bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size);
 
+  /// A utility function which checks for memory limits that may have been exceeded by
+  /// Allocate(), Reallocate(), AllocateLocal() or TrackAllocation(). Sets the
+  /// appropriate error status if necessary.
+  void CheckMemLimit(const char* fn_name, int64_t byte_size);
+
   /// Preallocated buffer for storing varargs (if the function has any). Allocated and
   /// owned by this object, but populated by an Expr function. The buffer is interpreted
   /// as an array of the appropriate AnyVal subclass.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index cd29f6f..9c3b131 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -280,14 +280,19 @@ inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
     context_->SetError(ss.str().c_str());
     return false;
   }
+  CheckMemLimit(fn_name, byte_size);
+  return true;
+}
+
+inline void FunctionContextImpl::CheckMemLimit(const char* fn_name,
+    int64_t byte_size) {
 #ifndef IMPALA_UDF_SDK_BUILD
   MemTracker* mem_tracker = pool_->mem_tracker();
-  if (mem_tracker->LimitExceeded()) {
+  if (mem_tracker->AnyLimitExceeded()) {
     ErrorMsg msg = ErrorMsg(TErrorCode::UDF_MEM_LIMIT_EXCEEDED, string(fn_name));
     state_->SetMemLimitExceeded(mem_tracker, byte_size, &msg);
   }
 #endif
-  return true;
 }
 
 uint8_t* FunctionContext::Allocate(int byte_size) noexcept {
@@ -351,6 +356,7 @@ void FunctionContext::TrackAllocation(int64_t bytes) {
   assert(!impl_->closed_);
   impl_->external_bytes_tracked_ += bytes;
   impl_->pool_->mem_tracker()->Consume(bytes);
+  impl_->CheckMemLimit("FunctionContext::TrackAllocation", bytes);
 }
 
 void FunctionContext::Free(int64_t bytes) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index f18947a..5e79c07 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -317,7 +317,7 @@ struct TQueryCtx {
   13: required i64 start_unix_millis
 
   // Hint to disable codegen. Set by planner for single-node optimization or by the
-  // backend in NativeEvalConstExprs() in FESupport. This flag is only advisory to
+  // backend in NativeEvalExprsWithoutRow() in FESupport. This flag is only advisory to
   // avoid the overhead of codegen and can be ignored if codegen is needed functionally.
   14: optional bool disable_codegen_hint = false;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
index 40fee99..0a51808 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
@@ -402,7 +402,7 @@ public class AnalyticExpr extends Expr {
       isPosConstant = false;
     } else {
       try {
-        TColumnValue val = FeSupport.EvalConstExpr(offset, analyzer.getQueryCtx());
+        TColumnValue val = FeSupport.EvalExprWithoutRow(offset, analyzer.getQueryCtx());
         if (TColumnValueUtil.getNumericVal(val) <= 0) isPosConstant = false;
       } catch (InternalException exc) {
         throw new AnalysisException(
@@ -502,8 +502,8 @@ public class AnalyticExpr extends Expr {
         }
         // Check if argument value is zero or negative and throw an exception if found.
         try {
-          TColumnValue bucketValue =
-              FeSupport.EvalConstExpr(getFnCall().getChild(0), analyzer.getQueryCtx());
+          TColumnValue bucketValue = FeSupport.EvalExprWithoutRow(
+              getFnCall().getChild(0), analyzer.getQueryCtx());
           Long arg = bucketValue.getLong_val();
           if (arg <= 0) {
             throw new AnalysisException("NTILE() requires a positive argument: " + arg);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
index 6fda1a6..42b94a2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
@@ -288,7 +288,7 @@ public class AnalyticWindow {
     if (e.isConstant() && e.getType().isNumericType()) {
       try {
         val = TColumnValueUtil.getNumericVal(
-            FeSupport.EvalConstExpr(e, analyzer.getQueryCtx()));
+            FeSupport.EvalExprWithoutRow(e, analyzer.getQueryCtx()));
         if (val <= 0) isPos = false;
       } catch (InternalException exc) {
         throw new AnalysisException(
@@ -329,8 +329,8 @@ public class AnalyticWindow {
         e2 != null && e2.isConstant() && e2.getType().isNumericType());
 
     try {
-      TColumnValue val1 = FeSupport.EvalConstExpr(e1, analyzer.getQueryCtx());
-      TColumnValue val2 = FeSupport.EvalConstExpr(e2, analyzer.getQueryCtx());
+      TColumnValue val1 = FeSupport.EvalExprWithoutRow(e1, analyzer.getQueryCtx());
+      TColumnValue val2 = FeSupport.EvalExprWithoutRow(e2, analyzer.getQueryCtx());
       double left = TColumnValueUtil.getNumericVal(val1);
       double right = TColumnValueUtil.getNumericVal(val2);
       if (left > right) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LimitElement.java b/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
index 30cb4dc..4645b95 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LimitElement.java
@@ -146,7 +146,7 @@ class LimitElement {
       throws AnalysisException {
     TColumnValue val = null;
     try {
-      val = FeSupport.EvalConstExpr(expr, analyzer.getQueryCtx());
+      val = FeSupport.EvalExprWithoutRow(expr, analyzer.getQueryCtx());
     } catch (InternalException e) {
       throw new AnalysisException("Failed to evaluate expr: " + expr.toSql(), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java b/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
index a4052c6..db0b442 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
@@ -170,7 +170,7 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
 
     TColumnValue val = null;
     try {
-      val = FeSupport.EvalConstExpr(constExpr, queryCtx);
+      val = FeSupport.EvalExprWithoutRow(constExpr, queryCtx);
     } catch (InternalException e) {
       LOG.error(String.format("Failed to evaluate expr '%s'",
           constExpr.toSql(), e.getMessage()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index c578726..8b87962 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.NullLiteral;
+import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TCacheJarParams;
@@ -68,8 +69,8 @@ public class FeSupport {
   public native static void NativeFeTestInit();
 
   // Returns a serialized TResultRow
-  public native static byte[] NativeEvalConstExprs(byte[] thriftExprBatch,
-      byte[] thriftQueryGlobals);
+  public native static byte[] NativeEvalExprsWithoutRow(
+      byte[] thriftExprBatch, byte[] thriftQueryGlobals);
 
   // Returns a serialized TSymbolLookupResult
   public native static byte[] NativeLookupSymbol(byte[] thriftSymbolLookup);
@@ -118,16 +119,16 @@ public class FeSupport {
     return NativeCacheJar(thriftParams);
   }
 
-  public static TColumnValue EvalConstExpr(Expr expr, TQueryCtx queryCtx)
+  public static TColumnValue EvalExprWithoutRow(Expr expr, TQueryCtx queryCtx)
       throws InternalException {
-    Preconditions.checkState(expr.isConstant());
+    Preconditions.checkState(!expr.contains(SlotRef.class));
     TExprBatch exprBatch = new TExprBatch();
     exprBatch.addToExprs(expr.treeToThrift());
     TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
     byte[] result;
     try {
-      result = EvalConstExprs(serializer.serialize(exprBatch),
-          serializer.serialize(queryCtx));
+      result = EvalExprsWithoutRow(
+          serializer.serialize(exprBatch), serializer.serialize(queryCtx));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
       TResultRow val = new TResultRow();
@@ -165,14 +166,14 @@ public class FeSupport {
     }
   }
 
-  private static byte[] EvalConstExprs(byte[] thriftExprBatch,
-      byte[] thriftQueryContext) {
+  private static byte[] EvalExprsWithoutRow(
+      byte[] thriftExprBatch, byte[] thriftQueryContext) {
     try {
-      return NativeEvalConstExprs(thriftExprBatch, thriftQueryContext);
+      return NativeEvalExprsWithoutRow(thriftExprBatch, thriftQueryContext);
     } catch (UnsatisfiedLinkError e) {
       loadLibrary();
     }
-    return NativeEvalConstExprs(thriftExprBatch, thriftQueryContext);
+    return NativeEvalExprsWithoutRow(thriftExprBatch, thriftQueryContext);
   }
 
   public static boolean EvalPredicate(Expr pred, TQueryCtx queryCtx)
@@ -181,7 +182,7 @@ public class FeSupport {
     if (pred instanceof BoolLiteral) return ((BoolLiteral) pred).getValue();
     if (pred instanceof NullLiteral) return false;
     Preconditions.checkState(pred.getType().isBoolean());
-    TColumnValue val = EvalConstExpr(pred, queryCtx);
+    TColumnValue val = EvalExprWithoutRow(pred, queryCtx);
     // Return false if pred evaluated to false or NULL. True otherwise.
     return val.isBool_val() && val.bool_val;
   }
@@ -193,7 +194,8 @@ public class FeSupport {
    *
    * TODO: This function is currently used for improving the performance of
    * partition pruning (see IMPALA-887), hence it only supports boolean
-   * exprs. In the future, we can extend it to support arbitrary constant exprs.
+   * exprs. In the future, we can extend it to support arbitrary exprs without
+   * SlotRefs.
    */
   public static TResultRow EvalPredicateBatch(ArrayList<Expr> exprs,
       TQueryCtx queryCtx) throws InternalException {
@@ -202,13 +204,13 @@ public class FeSupport {
     for (Expr expr: exprs) {
       // Make sure we only process boolean exprs.
       Preconditions.checkState(expr.getType().isBoolean());
-      Preconditions.checkState(expr.isConstant());
+      Preconditions.checkState(!expr.contains(SlotRef.class));
       exprBatch.addToExprs(expr.treeToThrift());
     }
     byte[] result;
     try {
-      result = EvalConstExprs(serializer.serialize(exprBatch),
-          serializer.serialize(queryCtx));
+      result = EvalExprsWithoutRow(
+          serializer.serialize(exprBatch), serializer.serialize(queryCtx));
       Preconditions.checkNotNull(result);
       TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
       TResultRow val = new TResultRow();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/testdata/workloads/functional-query/queries/QueryTest/udf-init-close-deterministic.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf-init-close-deterministic.test b/testdata/workloads/functional-query/queries/QueryTest/udf-init-close-deterministic.test
new file mode 100644
index 0000000..b02d7f5
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf-init-close-deterministic.test
@@ -0,0 +1,32 @@
+====
+---- QUERY
+# Test UDF used as constant partition key.
+drop table if exists udfinserttest;
+create table udfinserttest (a int) partitioned by (udf_was_opened string);
+
+insert overwrite table udfinserttest
+partition (udf_was_opened=cast(validate_open(1) as string)) values (1);
+
+# IMPALA-1030: exercise the case where a partition already exists
+insert overwrite table udfinserttest
+partition (udf_was_opened=cast(validate_open(1) as string)) values (1);
+
+# Don't overwrite
+insert into table udfinserttest
+partition (udf_was_opened=cast(validate_open(1) as string)) values (2);
+====
+---- QUERY
+select * from udfinserttest;
+---- TYPES
+int, string
+---- RESULTS
+1,'1'
+2,'1'
+====
+---- QUERY
+# Limit expression must be evaluated in FE.
+select bool_col from functional.alltypestiny limit if(validate_open(1), 0, 1)
+---- TYPES
+boolean
+---- RESULTS
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/testdata/workloads/functional-query/queries/QueryTest/udf-init-close.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf-init-close.test b/testdata/workloads/functional-query/queries/QueryTest/udf-init-close.test
index 23f1e90..6d59638 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/udf-init-close.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf-init-close.test
@@ -15,29 +15,6 @@ boolean
 true
 ====
 ---- QUERY
-drop table if exists udfinserttest;
-create table udfinserttest (a int) partitioned by (udf_was_opened string);
-
-insert overwrite table udfinserttest
-partition (udf_was_opened=cast(validate_open(1) as string)) values (1);
-
-# IMPALA-1030: exercise the case where a partition already exists
-insert overwrite table udfinserttest
-partition (udf_was_opened=cast(validate_open(1) as string)) values (1);
-
-# Don't overwrite
-insert into table udfinserttest
-partition (udf_was_opened=cast(validate_open(1) as string)) values (2);
-====
----- QUERY
-select * from udfinserttest;
----- TYPES
-int, string
----- RESULTS
-1,'1'
-2,'1'
-====
----- QUERY
 # merge node
 select validate_open(0);
 ---- TYPES
@@ -134,10 +111,3 @@ true
 true
 true
 ====
----- QUERY
-# FE
-select bool_col from functional.alltypestiny limit if(validate_open(1), 0, 1)
----- TYPES
-boolean
----- RESULTS
-====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test b/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test
new file mode 100644
index 0000000..c806d9d
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test
@@ -0,0 +1,17 @@
+====
+---- QUERY
+# Only run without expr rewrites (constant folding) because count_rows() is
+# non-deterministic.
+select count_rows() from functional.alltypestiny;
+---- TYPES
+BIGINT
+---- RESULTS
+1
+2
+3
+4
+5
+6
+7
+8
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/testdata/workloads/functional-query/queries/QueryTest/udf.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf.test b/testdata/workloads/functional-query/queries/QueryTest/udf.test
index 8da52c1..d605d76 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/udf.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf.test
@@ -523,25 +523,9 @@ INT
 210
 ====
 ---- QUERY
-# Disable expr rewrites (constant folding) because count_rows() is non-deterministic.
-set enable_expr_rewrites=false;
-select count_rows() from functional.alltypestiny;
----- TYPES
-BIGINT
----- RESULTS
-1
-2
-3
-4
-5
-6
-7
-8
-====
----- QUERY
 select pow(3,2), xpow(3,2);
 ---- TYPES
 DOUBLE, DOUBLE
 ---- RESULTS
 9,9
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 95195df..8f886b8 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -129,6 +129,18 @@ def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES,
                                  disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS,
                                  batch_sizes=ALL_BATCH_SIZES,
                                  sync_ddl=None, exec_single_node_option=[0]):
+  exec_option_dimensions = {
+      'abort_on_error': [1],
+      'exec_single_node_rows_threshold': exec_single_node_option,
+      'batch_size': batch_sizes,
+      'disable_codegen': disable_codegen_options,
+      'num_nodes': cluster_sizes}
+
+  if sync_ddl is not None:
+    exec_option_dimensions['sync_ddl'] = sync_ddl
+  return create_exec_option_dimension_from_dict(exec_option_dimensions)
+
+def create_exec_option_dimension_from_dict(exec_option_dimensions):
   """
   Builds a query exec option test dimension
 
@@ -140,16 +152,6 @@ def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES,
   TODO: In the future we could generate these values using pairwise to reduce total
   execution time.
   """
-  exec_option_dimensions = {
-      'abort_on_error': [1],
-      'exec_single_node_rows_threshold': exec_single_node_option,
-      'batch_size': batch_sizes,
-      'disable_codegen': disable_codegen_options,
-      'num_nodes': cluster_sizes}
-
-  if sync_ddl is not None:
-    exec_option_dimensions['sync_ddl'] = sync_ddl
-
   # Generate the cross product (all combinations) of the exec options specified. Then
   # store them in exec_option dictionary format.
   keys = sorted(exec_option_dimensions)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88448d1d/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 274ebef..56f1253 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from copy import copy
 import os
 import pytest
 from subprocess import check_call
@@ -25,58 +26,344 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
+    create_exec_option_dimension_from_dict,
     create_uncompressed_text_dimension)
 from tests.util.calculation_util import get_random_id
 from tests.util.filesystem_utils import get_fs_path, IS_S3
 
-class TestUdfs(ImpalaTestSuite):
+class TestUdfBase(ImpalaTestSuite):
+  """
+  Base class with utility functions for testing UDFs.
+  """
+  def _check_exception(self, e):
+    # The interesting exception message may be in 'e' or in its inner_exception
+    # depending on the point of query failure.
+    if 'Memory limit exceeded' in str(e) or 'Cancelled' in str(e):
+      return
+    if e.inner_exception is not None\
+       and ('Memory limit exceeded' in e.inner_exception.message
+            or 'Cancelled' not in e.inner_exception.message):
+      return
+    raise e
+
+  def _run_query_all_impalads(self, exec_options, query, expected):
+    impala_cluster = ImpalaCluster()
+    for impalad in impala_cluster.impalads:
+      client = impalad.service.create_beeswax_client()
+      result = self.execute_query_expect_success(client, query, exec_options)
+      assert result.data == expected
+
+  def _load_functions(self, template, vector, database, location):
+    queries = template.format(database=database, location=location)
+    # Split queries and remove empty lines
+    queries = [q for q in queries.split(';') if q.strip()]
+    exec_options = vector.get_value('exec_option')
+    for query in queries:
+      if query.strip() == '': continue
+      result = self.execute_query_expect_success(self.client, query, exec_options)
+      assert result is not None
+
+  # Create sample UDA functions in {database} from library {location}
+  create_sample_udas_template = """
+create aggregate function {database}.test_count(int) returns bigint
+location '{location}' update_fn='CountUpdate';
+
+create aggregate function {database}.hll(int) returns string
+location '{location}' update_fn='HllUpdate';
+
+create aggregate function {database}.sum_small_decimal(decimal(9,2))
+returns decimal(9,2) location '{location}' update_fn='SumSmallDecimalUpdate';
+"""
+
+  # Create test UDA functions in {database} from library {location}
+  create_test_udas_template = """
+create aggregate function {database}.trunc_sum(double)
+returns bigint intermediate double location '{location}'
+update_fn='TruncSumUpdate' merge_fn='TruncSumMerge'
+serialize_fn='TruncSumSerialize' finalize_fn='TruncSumFinalize';
+
+create aggregate function {database}.arg_is_const(int, int)
+returns boolean location '{location}'
+init_fn='ArgIsConstInit' update_fn='ArgIsConstUpdate' merge_fn='ArgIsConstMerge';
+
+create aggregate function {database}.toggle_null(int)
+returns int location '{location}'
+update_fn='ToggleNullUpdate' merge_fn='ToggleNullMerge';
+
+create aggregate function {database}.count_nulls(bigint)
+returns bigint location '{location}'
+update_fn='CountNullsUpdate' merge_fn='CountNullsMerge';
+"""
+
+  # Create test UDF functions in {database} from library {location}
+  create_udfs_template = """
+create function {database}.identity(boolean) returns boolean
+location '{location}' symbol='Identity';
+
+create function {database}.identity(tinyint) returns tinyint
+location '{location}' symbol='Identity';
+
+create function {database}.identity(smallint) returns smallint
+location '{location}' symbol='Identity';
+
+create function {database}.identity(int) returns int
+location '{location}' symbol='Identity';
+
+create function {database}.identity(bigint) returns bigint
+location '{location}' symbol='Identity';
+
+create function {database}.identity(float) returns float
+location '{location}' symbol='Identity';
+
+create function {database}.identity(double) returns double
+location '{location}' symbol='Identity';
+
+create function {database}.identity(string) returns string
+location '{location}'
+symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
+
+create function {database}.identity(timestamp) returns timestamp
+location '{location}'
+symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_12TimestampValE';
+
+create function {database}.identity(decimal(9,0)) returns decimal(9,0)
+location '{location}'
+symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
+
+create function {database}.identity(decimal(18,1)) returns decimal(18,1)
+location '{location}'
+symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
+
+create function {database}.identity(decimal(38,10)) returns decimal(38,10)
+location '{location}'
+symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
+
+create function {database}.all_types_fn(
+    string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0))
+returns int
+location '{location}' symbol='AllTypes';
+
+create function {database}.no_args() returns string
+location '{location}'
+symbol='_Z6NoArgsPN10impala_udf15FunctionContextE';
+
+create function {database}.var_and(boolean...) returns boolean
+location '{location}' symbol='VarAnd';
+
+create function {database}.var_sum(int...) returns int
+location '{location}' symbol='VarSum';
+
+create function {database}.var_sum(double...) returns double
+location '{location}' symbol='VarSum';
+
+create function {database}.var_sum(string...) returns int
+location '{location}' symbol='VarSum';
+
+create function {database}.var_sum(decimal(4,2)...) returns decimal(18,2)
+location '{location}' symbol='VarSum';
+
+create function {database}.var_sum_multiply(double, int...) returns double
+location '{location}'
+symbol='_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE';
+
+create function {database}.var_sum_multiply2(double, int...) returns double
+location '{location}'
+symbol='_Z15VarSumMultiply2PN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE';
+
+create function {database}.xpow(double, double) returns double
+location '{location}'
+symbol='_ZN6impala13MathFunctions3PowEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_';
+
+create function {database}.to_lower(string) returns string
+location '{location}'
+symbol='_Z7ToLowerPN10impala_udf15FunctionContextERKNS_9StringValE';
+
+create function {database}.constant_timestamp() returns timestamp
+location '{location}' symbol='ConstantTimestamp';
+
+create function {database}.validate_arg_type(string) returns boolean
+location '{location}' symbol='ValidateArgType';
+
+create function {database}.count_rows() returns bigint
+location '{location}' symbol='Count' prepare_fn='CountPrepare' close_fn='CountClose';
+
+create function {database}.constant_arg(int) returns int
+location '{location}' symbol='ConstantArg' prepare_fn='ConstantArgPrepare' close_fn='ConstantArgClose';
+
+create function {database}.validate_open(int) returns boolean
+location '{location}' symbol='ValidateOpen'
+prepare_fn='ValidateOpenPrepare' close_fn='ValidateOpenClose';
+
+create function {database}.mem_test(bigint) returns bigint
+location '{location}' symbol='MemTest'
+prepare_fn='MemTestPrepare' close_fn='MemTestClose';
+
+create function {database}.mem_test_leaks(bigint) returns bigint
+location '{location}' symbol='MemTest'
+prepare_fn='MemTestPrepare';
+
+-- Regression test for IMPALA-1475
+create function {database}.unmangled_symbol() returns bigint
+location '{location}' symbol='UnmangledSymbol';
+
+create function {database}.four_args(int, int, int, int) returns int
+location '{location}' symbol='FourArgs';
+
+create function {database}.five_args(int, int, int, int, int) returns int
+location '{location}' symbol='FiveArgs';
+
+create function {database}.six_args(int, int, int, int, int, int) returns int
+location '{location}' symbol='SixArgs';
+
+create function {database}.seven_args(int, int, int, int, int, int, int) returns int
+location '{location}' symbol='SevenArgs';
+
+create function {database}.eight_args(int, int, int, int, int, int, int, int) returns int
+location '{location}' symbol='EightArgs';
+
+create function {database}.twenty_args(int, int, int, int, int, int, int, int, int, int,
+    int, int, int, int, int, int, int, int, int, int) returns int
+location '{location}' symbol='TwentyArgs';
+
+create function {database}.twenty_one_args(int, int, int, int, int, int, int, int, int, int,
+    int, int, int, int, int, int, int, int, int, int, int) returns int
+location '{location}' symbol='TwentyOneArgs';
+"""
+
+class TestUdfExecution(TestUdfBase):
+  """Test execution of UDFs with a combination of different query options."""
   @classmethod
   def get_workload(cls):
     return 'functional-query'
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestUdfs, cls).add_test_dimensions()
+    super(TestUdfExecution, cls).add_test_dimensions()
     cls.TestMatrix.add_dimension(
-      create_exec_option_dimension(disable_codegen_options=[False, True],
-                                   exec_single_node_option=[0,100]))
+        create_exec_option_dimension_from_dict({"disable_codegen" : [False, True],
+          "exec_single_node_rows_threshold" : [0,100],
+          "enable_expr_rewrites" : [False, True]}))
     # There is no reason to run these tests using all dimensions.
     cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 
   def test_native_functions(self, vector, unique_database):
-    self.__load_functions(
+    enable_expr_rewrites = vector.get_value('exec_option')['enable_expr_rewrites']
+    self._load_functions(
       self.create_udfs_template, vector, unique_database,
       get_fs_path('/test-warehouse/libTestUdfs.so'))
-    self.__load_functions(
+    self._load_functions(
       self.create_sample_udas_template, vector, unique_database,
       get_fs_path('/test-warehouse/libudasample.so'))
-    self.__load_functions(
+    self._load_functions(
       self.create_test_udas_template, vector, unique_database,
       get_fs_path('/test-warehouse/libTestUdas.so'))
 
     self.run_test_case('QueryTest/udf', vector, use_db=unique_database)
     if not vector.get_value('exec_option')['disable_codegen']:
       self.run_test_case('QueryTest/udf-codegen-required', vector, use_db=unique_database)
-    self.run_test_case('QueryTest/udf-init-close', vector, use_db=unique_database)
     self.run_test_case('QueryTest/uda', vector, use_db=unique_database)
+    self.run_test_case('QueryTest/udf-init-close', vector, use_db=unique_database)
+    # Some tests assume determinism or non-determinism, which depends on expr rewrites.
+    if enable_expr_rewrites:
+      self.run_test_case('QueryTest/udf-init-close-deterministic', vector,
+          use_db=unique_database)
+    else:
+      self.run_test_case('QueryTest/udf-non-deterministic', vector,
+          use_db=unique_database)
 
   def test_ir_functions(self, vector, unique_database):
     if vector.get_value('exec_option')['disable_codegen']:
       # IR functions require codegen to be enabled.
       return
-    self.__load_functions(
+    enable_expr_rewrites = vector.get_value('exec_option')['enable_expr_rewrites']
+    self._load_functions(
       self.create_udfs_template, vector, unique_database,
       get_fs_path('/test-warehouse/test-udfs.ll'))
     self.run_test_case('QueryTest/udf', vector, use_db=unique_database)
     self.run_test_case('QueryTest/udf-init-close', vector, use_db=unique_database)
+    # Some tests assume determinism or non-determinism, which depends on expr rewrites.
+    if enable_expr_rewrites:
+      self.run_test_case('QueryTest/udf-init-close-deterministic', vector,
+          use_db=unique_database)
+    else:
+      self.run_test_case('QueryTest/udf-non-deterministic', vector,
+          use_db=unique_database)
+
+  def test_java_udfs(self, vector, unique_database):
+    self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)
+    self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
 
   def test_udf_errors(self, vector, unique_database):
-    # Disable codegen to force interpretation path to be taken.
+    # Only run with codegen disabled to force interpretation path to be taken.
     # Aim to exercise two failure cases:
     # 1. too many arguments
     # 2. IR UDF
-    vector.get_value('exec_option')['disable_codegen'] = 1
-    self.run_test_case('QueryTest/udf-errors', vector, use_db=unique_database)
+    if vector.get_value('exec_option')['disable_codegen']:
+      self.run_test_case('QueryTest/udf-errors', vector, use_db=unique_database)
+
+  # Run serially because this will blow the process limit, potentially causing other
+  # queries to fail
+  @pytest.mark.execute_serially
+  def test_mem_limits(self, vector, unique_database):
+    # Set the mem limit high enough that a simple scan can run
+    mem_limit = 1024 * 1024
+    vector = copy(vector)
+    vector.get_value('exec_option')['mem_limit'] = mem_limit
+    try:
+      self.run_test_case('QueryTest/udf-mem-limit', vector, use_db=unique_database)
+      assert False, "Query was expected to fail"
+    except ImpalaBeeswaxException, e:
+      self._check_exception(e)
+
+    try:
+      self.run_test_case('QueryTest/uda-mem-limit', vector, use_db=unique_database)
+      assert False, "Query was expected to fail"
+    except ImpalaBeeswaxException, e:
+      self._check_exception(e)
+
+  def test_udf_constant_folding(self, vector, unique_database):
+    """Test that constant folding of UDFs is handled correctly. Uses count_rows(),
+    which returns a unique value every time it is evaluated in the same thread."""
+    exec_options = copy(vector.get_value('exec_option'))
+    # Execute on a single node so that all counter values will be unique.
+    exec_options["num_nodes"] = 1
+    create_fn_query = """create function {database}.count_rows() returns bigint
+                         location '{location}' symbol='Count' prepare_fn='CountPrepare'
+                         close_fn='CountClose'"""
+    self._load_functions(create_fn_query, vector, unique_database,
+        get_fs_path('/test-warehouse/libTestUdfs.so'))
+
+    # Only one distinct value if the expression is constant folded, otherwise one
+    # value per row in alltypes
+    expected_ndv = 1 if exec_options['enable_expr_rewrites'] else 7300
+
+    # Test fully constant expression, evaluated in FE.
+    query = "select `{0}`.count_rows() from functional.alltypes".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    actual_ndv = len(set(result.data))
+    assert actual_ndv == expected_ndv
+
+    # Test constant argument to a non-constant expr. The argument value can be
+    # cached in the backend.
+    query = """select concat(cast(`{0}`.count_rows() as string), '-', string_col)
+               from functional.alltypes""".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    actual_ndv = len(set(value.split("-")[0] for value in result.data))
+    assert actual_ndv == expected_ndv
+
+
+class TestUdfTargeted(TestUdfBase):
+  """Targeted UDF tests that don't need to be run under the full combination of
+  exec options."""
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestUdfTargeted, cls).add_test_dimensions()
+    # There is no reason to run these tests using all dimensions.
+    cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 
   def test_udf_invalid_symbol(self, vector, unique_database):
     """ IMPALA-1642: Impala crashes if the symbol for a Hive UDF doesn't exist
@@ -97,10 +384,6 @@ class TestUdfs(ImpalaTestSuite):
       assert "Unable to find class" in str(ex)
     self.client.execute(drop_fn_stmt)
 
-  def test_java_udfs(self, vector, unique_database):
-    self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)
-    self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
-
   @SkipIfLocal.multiple_impalad
   def test_hive_udfs_missing_jar(self, vector, unique_database):
     """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present
@@ -121,7 +404,7 @@ class TestUdfs(ImpalaTestSuite):
     client = impalad.service.create_beeswax_client()
     # Create and drop functions with sync_ddl to make sure they are reflected
     # in every impalad.
-    exec_option = vector.get_value('exec_option')
+    exec_option = copy(vector.get_value('exec_option'))
     exec_option['sync_ddl'] = 1
 
     self.execute_query_expect_success(client, drop_fn_stmt, exec_option)
@@ -148,7 +431,7 @@ class TestUdfs(ImpalaTestSuite):
     """Test updating the UDF binary without restarting Impala. Dropping
     the function should remove the binary from the local cache."""
     # Run with sync_ddl to guarantee the drop is processed by all impalads.
-    exec_options = vector.get_value('exec_option')
+    exec_options = copy(vector.get_value('exec_option'))
     exec_options['sync_ddl'] = 1
     old_udf = os.path.join(
         os.environ['IMPALA_HOME'], 'testdata/udfs/impala-hive-udfs.jar')
@@ -168,20 +451,20 @@ class TestUdfs(ImpalaTestSuite):
     check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
     self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
     self.execute_query_expect_success(self.client, create_fn_stmt, exec_options)
-    self.__run_query_all_impalads(exec_options, query_stmt, ["Old UDF"])
+    self._run_query_all_impalads(exec_options, query_stmt, ["Old UDF"])
 
     # Update the binary, drop and create the function again. The new binary should
     # be running.
     check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
     self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
     self.execute_query_expect_success(self.client, create_fn_stmt, exec_options)
-    self.__run_query_all_impalads(exec_options, query_stmt, ["New UDF"])
+    self._run_query_all_impalads(exec_options, query_stmt, ["New UDF"])
 
   def test_udf_update_via_create(self, vector, unique_database):
     """Test updating the UDF binary without restarting Impala. Creating a new function
     from the library should refresh the cache."""
     # Run with sync_ddl to guarantee the create is processed by all impalads.
-    exec_options = vector.get_value('exec_option')
+    exec_options = copy(vector.get_value('exec_option'))
     exec_options['sync_ddl'] = 1
     old_udf = os.path.join(
         os.environ['IMPALA_HOME'], 'testdata/udfs/impala-hive-udfs.jar')
@@ -208,7 +491,7 @@ class TestUdfs(ImpalaTestSuite):
     check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
     self.execute_query_expect_success(
         self.client, create_fn_template.format(old_function_name), exec_options)
-    self.__run_query_all_impalads(
+    self._run_query_all_impalads(
         exec_options, query_template.format(old_function_name), ["Old UDF"])
 
     # Update the binary, and create a new function using the binary. The new binary
@@ -216,11 +499,11 @@ class TestUdfs(ImpalaTestSuite):
     check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
     self.execute_query_expect_success(
         self.client, create_fn_template.format(new_function_name), exec_options)
-    self.__run_query_all_impalads(
+    self._run_query_all_impalads(
         exec_options, query_template.format(new_function_name), ["New UDF"])
 
     # The old function should use the new library now
-    self.__run_query_all_impalads(
+    self._run_query_all_impalads(
         exec_options, query_template.format(old_function_name), ["New UDF"])
 
   def test_drop_function_while_running(self, vector, unique_database):
@@ -252,275 +535,3 @@ class TestUdfs(ImpalaTestSuite):
     assert results.success
     assert len(results.data) == 9999
 
-  # Run serially because this will blow the process limit, potentially causing other
-  # queries to fail
-  @pytest.mark.execute_serially
-  def test_mem_limits(self, vector, unique_database):
-    # Set the mem limit high enough that a simple scan can run
-    mem_limit = 1024 * 1024
-    vector.get_value('exec_option')['mem_limit'] = mem_limit
-
-    try:
-      self.run_test_case('QueryTest/udf-mem-limit', vector, use_db=unique_database)
-      assert False, "Query was expected to fail"
-    except ImpalaBeeswaxException, e:
-      self.__check_exception(e)
-
-    try:
-      self.run_test_case('QueryTest/uda-mem-limit', vector, use_db=unique_database)
-      assert False, "Query was expected to fail"
-    except ImpalaBeeswaxException, e:
-      self.__check_exception(e)
-
-  def __check_exception(self, e):
-    # The interesting exception message may be in 'e' or in its inner_exception
-    # depending on the point of query failure.
-    if 'Memory limit exceeded' in str(e) or 'Cancelled' in str(e):
-      return
-    if e.inner_exception is not None\
-       and ('Memory limit exceeded' in e.inner_exception.message
-            or 'Cancelled' not in e.inner_exception.message):
-      return
-    raise e
-
-  def __run_query_all_impalads(self, exec_options, query, expected):
-    impala_cluster = ImpalaCluster()
-    for impalad in impala_cluster.impalads:
-      client = impalad.service.create_beeswax_client()
-      result = self.execute_query_expect_success(client, query, exec_options)
-      assert result.data == expected
-
-  def __load_functions(self, template, vector, database, location):
-    queries = template.format(database=database, location=location)
-    # Split queries and remove empty lines
-    queries = [q for q in queries.split(';') if q.strip()]
-    exec_options = vector.get_value('exec_option')
-    for query in queries:
-      if query.strip() == '': continue
-      result = self.execute_query_expect_success(self.client, query, exec_options)
-      assert result is not None
-
-  # Create sample UDA functions in {database} from library {location}
-  create_sample_udas_template = """
-drop function if exists {database}.test_count(int);
-drop function if exists {database}.hll(int);
-drop function if exists {database}.sum_small_decimal(decimal(9,2));
-
-create database if not exists {database};
-
-create aggregate function {database}.test_count(int) returns bigint
-location '{location}' update_fn='CountUpdate';
-
-create aggregate function {database}.hll(int) returns string
-location '{location}' update_fn='HllUpdate';
-
-create aggregate function {database}.sum_small_decimal(decimal(9,2))
-returns decimal(9,2) location '{location}' update_fn='SumSmallDecimalUpdate';
-"""
-
-  # Create test UDA functions in {database} from library {location}
-  create_test_udas_template = """
-drop function if exists {database}.trunc_sum(double);
-drop function if exists {database}.arg_is_const(int, int);
-drop function if exists {database}.toggle_null(int);
-drop function if exists {database}.count_nulls(bigint);
-
-create database if not exists {database};
-
-create aggregate function {database}.trunc_sum(double)
-returns bigint intermediate double location '{location}'
-update_fn='TruncSumUpdate' merge_fn='TruncSumMerge'
-serialize_fn='TruncSumSerialize' finalize_fn='TruncSumFinalize';
-
-create aggregate function {database}.arg_is_const(int, int)
-returns boolean location '{location}'
-init_fn='ArgIsConstInit' update_fn='ArgIsConstUpdate' merge_fn='ArgIsConstMerge';
-
-create aggregate function {database}.toggle_null(int)
-returns int location '{location}'
-update_fn='ToggleNullUpdate' merge_fn='ToggleNullMerge';
-
-create aggregate function {database}.count_nulls(bigint)
-returns bigint location '{location}'
-update_fn='CountNullsUpdate' merge_fn='CountNullsMerge';
-"""
-
-  # Create test UDF functions in {database} from library {location}
-  create_udfs_template = """
-drop function if exists {database}.identity(boolean);
-drop function if exists {database}.identity(tinyint);
-drop function if exists {database}.identity(smallint);
-drop function if exists {database}.identity(int);
-drop function if exists {database}.identity(bigint);
-drop function if exists {database}.identity(float);
-drop function if exists {database}.identity(double);
-drop function if exists {database}.identity(string);
-drop function if exists {database}.identity(timestamp);
-drop function if exists {database}.identity(decimal(9,0));
-drop function if exists {database}.identity(decimal(18,1));
-drop function if exists {database}.identity(decimal(38,10));
-drop function if exists {database}.all_types_fn(
-    string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0));
-drop function if exists {database}.no_args();
-drop function if exists {database}.var_and(boolean...);
-drop function if exists {database}.var_sum(int...);
-drop function if exists {database}.var_sum(double...);
-drop function if exists {database}.var_sum(string...);
-drop function if exists {database}.var_sum(decimal(4,2)...);
-drop function if exists {database}.var_sum_multiply(double, int...);
-drop function if exists {database}.var_sum_multiply2(double, int...);
-drop function if exists {database}.xpow(double, double);
-drop function if exists {database}.to_lower(string);
-drop function if exists {database}.constant_timestamp();
-drop function if exists {database}.validate_arg_type(string);
-drop function if exists {database}.count_rows();
-drop function if exists {database}.constant_arg(int);
-drop function if exists {database}.validate_open(int);
-drop function if exists {database}.mem_test(bigint);
-drop function if exists {database}.mem_test_leaks(bigint);
-drop function if exists {database}.unmangled_symbol();
-drop function if exists {database}.four_args(int, int, int, int);
-drop function if exists {database}.five_args(int, int, int, int, int);
-drop function if exists {database}.six_args(int, int, int, int, int, int);
-drop function if exists {database}.seven_args(int, int, int, int, int, int, int);
-drop function if exists {database}.eight_args(int, int, int, int, int, int, int, int);
-drop function if exists {database}.nine_args(int, int, int, int, int, int, int, int, int);
-drop function if exists {database}.twenty_args(int, int, int, int, int, int, int, int, int,
-    int, int, int, int, int, int, int, int, int, int, int);
-drop function if exists {database}.twenty_one_args(int, int, int, int, int, int, int, int,
-    int, int, int, int, int, int, int, int, int, int, int, int, int);
-
-create database if not exists {database};
-
-create function {database}.identity(boolean) returns boolean
-location '{location}' symbol='Identity';
-
-create function {database}.identity(tinyint) returns tinyint
-location '{location}' symbol='Identity';
-
-create function {database}.identity(smallint) returns smallint
-location '{location}' symbol='Identity';
-
-create function {database}.identity(int) returns int
-location '{location}' symbol='Identity';
-
-create function {database}.identity(bigint) returns bigint
-location '{location}' symbol='Identity';
-
-create function {database}.identity(float) returns float
-location '{location}' symbol='Identity';
-
-create function {database}.identity(double) returns double
-location '{location}' symbol='Identity';
-
-create function {database}.identity(string) returns string
-location '{location}'
-symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
-
-create function {database}.identity(timestamp) returns timestamp
-location '{location}'
-symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_12TimestampValE';
-
-create function {database}.identity(decimal(9,0)) returns decimal(9,0)
-location '{location}'
-symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
-
-create function {database}.identity(decimal(18,1)) returns decimal(18,1)
-location '{location}'
-symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
-
-create function {database}.identity(decimal(38,10)) returns decimal(38,10)
-location '{location}'
-symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
-
-create function {database}.all_types_fn(
-    string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0))
-returns int
-location '{location}' symbol='AllTypes';
-
-create function {database}.no_args() returns string
-location '{location}'
-symbol='_Z6NoArgsPN10impala_udf15FunctionContextE';
-
-create function {database}.var_and(boolean...) returns boolean
-location '{location}' symbol='VarAnd';
-
-create function {database}.var_sum(int...) returns int
-location '{location}' symbol='VarSum';
-
-create function {database}.var_sum(double...) returns double
-location '{location}' symbol='VarSum';
-
-create function {database}.var_sum(string...) returns int
-location '{location}' symbol='VarSum';
-
-create function {database}.var_sum(decimal(4,2)...) returns decimal(18,2)
-location '{location}' symbol='VarSum';
-
-create function {database}.var_sum_multiply(double, int...) returns double
-location '{location}'
-symbol='_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE';
-
-create function {database}.var_sum_multiply2(double, int...) returns double
-location '{location}'
-symbol='_Z15VarSumMultiply2PN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE';
-
-create function {database}.xpow(double, double) returns double
-location '{location}'
-symbol='_ZN6impala13MathFunctions3PowEPN10impala_udf15FunctionContextERKNS1_9DoubleValES6_';
-
-create function {database}.to_lower(string) returns string
-location '{location}'
-symbol='_Z7ToLowerPN10impala_udf15FunctionContextERKNS_9StringValE';
-
-create function {database}.constant_timestamp() returns timestamp
-location '{location}' symbol='ConstantTimestamp';
-
-create function {database}.validate_arg_type(string) returns boolean
-location '{location}' symbol='ValidateArgType';
-
-create function {database}.count_rows() returns bigint
-location '{location}' symbol='Count' prepare_fn='CountPrepare' close_fn='CountClose';
-
-create function {database}.constant_arg(int) returns int
-location '{location}' symbol='ConstantArg' prepare_fn='ConstantArgPrepare' close_fn='ConstantArgClose';
-
-create function {database}.validate_open(int) returns boolean
-location '{location}' symbol='ValidateOpen'
-prepare_fn='ValidateOpenPrepare' close_fn='ValidateOpenClose';
-
-create function {database}.mem_test(bigint) returns bigint
-location '{location}' symbol='MemTest'
-prepare_fn='MemTestPrepare' close_fn='MemTestClose';
-
-create function {database}.mem_test_leaks(bigint) returns bigint
-location '{location}' symbol='MemTest'
-prepare_fn='MemTestPrepare';
-
--- Regression test for IMPALA-1475
-create function {database}.unmangled_symbol() returns bigint
-location '{location}' symbol='UnmangledSymbol';
-
-create function {database}.four_args(int, int, int, int) returns int
-location '{location}' symbol='FourArgs';
-
-create function {database}.five_args(int, int, int, int, int) returns int
-location '{location}' symbol='FiveArgs';
-
-create function {database}.six_args(int, int, int, int, int, int) returns int
-location '{location}' symbol='SixArgs';
-
-create function {database}.seven_args(int, int, int, int, int, int, int) returns int
-location '{location}' symbol='SevenArgs';
-
-create function {database}.eight_args(int, int, int, int, int, int, int, int) returns int
-location '{location}' symbol='EightArgs';
-
-create function {database}.twenty_args(int, int, int, int, int, int, int, int, int, int,
-    int, int, int, int, int, int, int, int, int, int) returns int
-location '{location}' symbol='TwentyArgs';
-
-create function {database}.twenty_one_args(int, int, int, int, int, int, int, int, int, int,
-    int, int, int, int, int, int, int, int, int, int, int) returns int
-location '{location}' symbol='TwentyOneArgs';
-"""