You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/02/16 21:03:42 UTC

[1/5] incubator-impala git commit: IMPALA-4729: Implement REPLACE()

Repository: incubator-impala
Updated Branches:
  refs/heads/master 894bb7785 -> 26eaa2660


IMPALA-4729: Implement REPLACE()

This turned out to be slightly non-trivial as REPLACE is already a
keyword, and thus the parser needs to be tweaked to allow this,
since function names act as bare identifiers.

It was difficult to get this to match performance of regexp_replace.
For expanding patterns, the fact that regexp_replace copies the
expansion inline means that it may in fact win on large strings
with sparse matches that are > dcache size apart.  Let's leave
optimizing that for later.

Testing: Added a full test for maximum size strings and got most
of the boundary conditions I could identify.  Manually ran queries
on TPC-H dataset in impala to verify both performance and correctness.
Added large string and exprs.test test clauses and ran the tests to
verify they work as expected.

Change-Id: I1780a7d8fee6d0db9dad148217fb6eb10f773329
Reviewed-on: http://gerrit.cloudera.org:8080/5776
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0715a303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0715a303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0715a303

Branch: refs/heads/master
Commit: 0715a303ea0db01c108ece3d7692da158ada5c20
Parents: 894bb77
Author: Zach Amsden <za...@cloudera.com>
Authored: Tue Jan 24 03:04:38 2017 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 15 01:33:23 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc                       | 116 ++++++++++++++
 be/src/exprs/string-functions-ir.cc             | 160 +++++++++++++++++++
 be/src/exprs/string-functions.h                 |   4 +
 be/src/gutil/bits.h                             |   3 -
 be/src/udf/udf-internal.h                       |   5 +
 be/src/udf/udf-test-harness.cc                  |   5 +-
 be/src/udf/udf-test-harness.h                   |   4 +-
 be/src/udf/udf.cc                               |  41 ++++-
 be/src/udf/udf.h                                |  14 ++
 common/function-registry/impala_functions.py    |   3 +
 fe/src/main/cup/sql-parser.cup                  |   4 +-
 .../org/apache/impala/analysis/ParserTest.java  |   8 +-
 .../queries/QueryTest/exprs.test                |  27 ++++
 .../queries/QueryTest/large_strings.test        |  18 +++
 14 files changed, 400 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 52e74c5..5f3d1d0 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -36,12 +36,14 @@
 #include "exprs/like-predicate.h"
 #include "exprs/literal.h"
 #include "exprs/null-literal.h"
+#include "exprs/string-functions.h"
 #include "exprs/timestamp-functions.h"
 #include "gen-cpp/Exprs_types.h"
 #include "gen-cpp/hive_metastore_types.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-server.h"
 #include "runtime/runtime-state.h"
+#include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/string-value.h"
@@ -50,6 +52,7 @@
 #include "service/impala-server.h"
 #include "testutil/impalad-query-executor.h"
 #include "testutil/in-process-servers.h"
+#include "udf/udf-test-harness.h"
 #include "util/debug-util.h"
 #include "util/string-parser.h"
 #include "util/test-info.h"
@@ -2085,6 +2088,119 @@ TEST_F(ExprTest, StringFunctions) {
     TestIsNull(length_aliases[i] + "(NULL)", TYPE_INT);
   }
 
+  TestStringValue("replace('aaaaaa', 'a', 'b')", "bbbbbb");
+  TestStringValue("replace('aaaaaa', 'aa', 'b')", "bbb");
+  TestStringValue("replace('aaaaaaa', 'aa', 'b')", "bbba");
+  TestStringValue("replace('zzzaaaaaaqqq', 'a', 'b')", "zzzbbbbbbqqq");
+  TestStringValue("replace('zzzaaaaaaaqqq', 'aa', 'b')", "zzzbbbaqqq");
+  TestStringValue("replace('aaaaaaaaaaaaaaaaaaaa', 'a', 'b')", "bbbbbbbbbbbbbbbbbbbb");
+  TestStringValue("replace('bobobobobo', 'bobo', 'a')", "aabo");
+  TestStringValue("replace('abc', 'abcd', 'a')", "abc");
+  TestStringValue("replace('aaaaaa', '', 'b')", "aaaaaa");
+  TestStringValue("replace('abc', 'abc', '')", "");
+  TestStringValue("replace('abcdefg', 'z', '')", "abcdefg");
+  TestStringValue("replace('', 'zoltan', 'sultan')", "");
+  TestStringValue("replace('strstrstr', 'str', 'strstr')", "strstrstrstrstrstr");
+  TestStringValue("replace('aaaaaa', 'a', '')", "");
+  TestIsNull("replace(NULL, 'foo', 'bar')", TYPE_STRING);
+  TestIsNull("replace('zomg', 'foo', NULL)", TYPE_STRING);
+  TestIsNull("replace('abc', NULL, 'a')", TYPE_STRING);
+
+  // Do some tests with huge strings
+  const int huge_size = 500000;
+  std::string huge_str;
+  huge_str.reserve(huge_size);
+  huge_str.append(huge_size, 'A');
+
+  std::string huge_space;
+  huge_space.reserve(huge_size);
+  huge_space.append(huge_size, ' ');
+
+  std::string huger_str;
+  huger_str.reserve(3 * huge_size);
+  huger_str.append(3 * huge_size, 'A');
+
+  TestStringValue("replace('" + huge_str + "', 'A', ' ')", huge_space);
+  TestStringValue("replace('" + huge_str + "', 'A', '')", "");
+
+  TestStringValue("replace('" + huge_str + "', 'A', 'AAA')", huger_str);
+  TestStringValue("replace('" + huger_str + "', 'AAA', 'A')", huge_str);
+
+  auto* giga_buf = new std::array<uint8_t, StringVal::MAX_LENGTH>;
+  giga_buf->fill('A');
+  (*giga_buf)[0] = 'Z';
+  (*giga_buf)[10] = 'Z';
+  (*giga_buf)[100] = 'Z';
+  (*giga_buf)[1000] = 'Z';
+  (*giga_buf)[StringVal::MAX_LENGTH-1] = 'Z';
+
+  // Hack up a function context so we can call Replace functions directly.
+  MemTracker m;
+  MemPool pool(&m);
+  FunctionContext::TypeDesc str_desc;
+  str_desc.type = FunctionContext::Type::TYPE_STRING;
+  std::vector<FunctionContext::TypeDesc> v(3, str_desc);
+  auto context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+
+  StringVal giga(static_cast<uint8_t*>(giga_buf->data()), StringVal::MAX_LENGTH);
+  StringVal a("A");
+  StringVal z("Z");
+  StringVal aaa("aaa");
+
+  // Replace z's with a's on giga
+  auto r1 = StringFunctions::Replace(context, giga, z, a);
+  EXPECT_EQ(r1.ptr[0], 'A');
+  EXPECT_EQ(r1.ptr[10], 'A');
+  EXPECT_EQ(r1.ptr[100], 'A');
+  EXPECT_EQ(r1.ptr[1000], 'A');
+  EXPECT_EQ(r1.ptr[StringVal::MAX_LENGTH-1], 'A');
+
+  // Entire string match is legal
+  auto r2 = StringFunctions::Replace(context, giga, giga, a);
+  EXPECT_EQ(r2, a);
+
+  // So is replacing giga with itself
+  auto r3 = StringFunctions::Replace(context, giga, giga, giga);
+  EXPECT_EQ(r3.ptr[0], 'Z');
+  EXPECT_EQ(r3.ptr[10], 'Z');
+  EXPECT_EQ(r3.ptr[100], 'Z');
+  EXPECT_EQ(r3.ptr[1000], 'Z');
+  EXPECT_EQ(r3.ptr[StringVal::MAX_LENGTH-1], 'Z');
+
+  // Expect expansion to fail as soon as possible; test with unallocated string space
+  // This tests overflowing the first allocation.
+  auto* short_buf = new std::array<uint8_t, 4096>;
+  short_buf->fill('A');
+  (*short_buf)[1000] = 'Z';
+  StringVal bam(static_cast<uint8_t*>(short_buf->data()), StringVal::MAX_LENGTH);
+  auto r4 = StringFunctions::Replace(context, bam, z, aaa);
+  EXPECT_TRUE(r4.is_null);
+
+  // Similar test for second overflow.  This tests overflowing on re-allocation.
+  (*short_buf)[4095] = 'Z';
+  StringVal bam2(static_cast<uint8_t*>(short_buf->data()), StringVal::MAX_LENGTH-2);
+  auto r5 = StringFunctions::Replace(context, bam2, z, aaa);
+  EXPECT_TRUE(r5.is_null);
+
+  // Finally, test expanding to exactly MAX_LENGTH
+  // There are 4 Zs in giga4 (not including the trailing one, as we truncate that)
+  StringVal giga4(static_cast<uint8_t*>(giga_buf->data()), StringVal::MAX_LENGTH-8);
+  auto r6 = StringFunctions::Replace(context, giga4, z, aaa);
+  EXPECT_EQ(strncmp((char*)&r6.ptr[0], "aaaA", 4), 0);
+  EXPECT_EQ(r6.len, 1 << 30);
+
+  // Finally, an expansion in the last string position
+  (*giga_buf)[StringVal::MAX_LENGTH-11] = 'Z';
+  StringVal giga5(static_cast<uint8_t*>(giga_buf->data()), StringVal::MAX_LENGTH-10);
+  auto r7 = StringFunctions::Replace(context, giga5, z, aaa);
+  EXPECT_EQ(r7.len, 1 << 30);
+  EXPECT_EQ(strncmp((char*)&r7.ptr[StringVal::MAX_LENGTH-4], "Aaaa", 4), 0);
+
+  UdfTestHarness::CloseContext(context);
+  delete giga_buf;
+  delete short_buf;
+  pool.FreeAll();
+
   TestStringValue("reverse('abcdefg')", "gfedcba");
   TestStringValue("reverse('')", "");
   TestIsNull("reverse(NULL)", TYPE_STRING);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/exprs/string-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc
index 54efbb3..dcef9cf 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -23,10 +23,13 @@
 #include <re2/stringpiece.h>
 #include <bitset>
 
+#include <boost/static_assert.hpp>
+
 #include "exprs/anyval-util.h"
 #include "exprs/expr.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/tuple-row.h"
+#include "util/bit-util.h"
 #include "util/coding-util.h"
 #include "util/url-parser.h"
 
@@ -204,6 +207,163 @@ StringVal StringFunctions::InitCap(FunctionContext* context, const StringVal& st
   return result;
 }
 
+struct ReplaceContext {
+  ReplaceContext(StringVal *pattern_in) {
+    pattern = StringValue::FromStringVal(*pattern_in);
+    search = StringSearch(&pattern);
+  }
+  StringValue pattern;
+  StringSearch search;
+};
+
+void StringFunctions::ReplacePrepare(FunctionContext* context,
+    FunctionContext::FunctionStateScope scope) {
+  if (scope != FunctionContext::FRAGMENT_LOCAL) return;
+  if (!context->IsArgConstant(1)) return;
+  DCHECK_EQ(context->GetArgType(1)->type, FunctionContext::TYPE_STRING);
+  StringVal* pattern = reinterpret_cast<StringVal*>(context->GetConstantArg(1));
+  if (pattern->is_null || pattern->len == 0) return;
+
+  struct ReplaceContext* replace = context->Allocate<ReplaceContext>();
+  if (replace != nullptr) {
+    new(replace) ReplaceContext(pattern);
+    context->SetFunctionState(scope, replace);
+  }
+}
+
+void StringFunctions::ReplaceClose(FunctionContext* context,
+    FunctionContext::FunctionStateScope scope) {
+  if (scope != FunctionContext::FRAGMENT_LOCAL) return;
+  ReplaceContext* rptr = reinterpret_cast<ReplaceContext*>
+      (context->GetFunctionState(FunctionContext::FRAGMENT_LOCAL));
+  if (rptr != nullptr) context->Free(reinterpret_cast<uint8_t*>(rptr));
+}
+
+StringVal StringFunctions::Replace(FunctionContext* context, const StringVal& str,
+    const StringVal& pattern, const StringVal& replace) {
+  DCHECK_LE(str.len, StringVal::MAX_LENGTH);
+  DCHECK_LE(pattern.len, StringVal::MAX_LENGTH);
+  DCHECK_LE(replace.len, StringVal::MAX_LENGTH);
+  if (str.is_null || pattern.is_null || replace.is_null) return StringVal::null();
+  if (pattern.len == 0 || pattern.len > str.len) return str;
+
+  // StringSearch keeps a pointer to the StringValue object, so it must remain
+  // in scope if used.
+  StringSearch search;
+  StringValue needle;
+  const StringSearch *search_ptr;
+  const ReplaceContext* rptr = reinterpret_cast<ReplaceContext*>
+      (context->GetFunctionState(FunctionContext::FRAGMENT_LOCAL));
+  if (UNLIKELY(rptr == nullptr)) {
+    needle = StringValue::FromStringVal(pattern);
+    search = StringSearch(&needle);
+    search_ptr = &search;
+  } else {
+    search_ptr = &rptr->search;
+  }
+
+  const StringValue haystack = StringValue::FromStringVal(str);
+  int64_t match_pos = search_ptr->Search(&haystack);
+
+  // No match?  Skip everything.
+  if (match_pos < 0) return str;
+
+  DCHECK_GT(pattern.len, 0);
+  DCHECK_GE(haystack.len, pattern.len);
+  int buffer_space;
+  const int delta = replace.len - pattern.len;
+  // MAX_LENGTH is unsigned, so convert back to int to do correctly signed compare
+  DCHECK_LE(delta, static_cast<int>(StringVal::MAX_LENGTH) - 1);
+  if ((delta > 0 && delta < 128) && haystack.len <= 128) {
+    // Quick estimate for potential matches - this heuristic is needed to win
+    // over regexp_replace on expanding patterns.  128 is arbitrarily chosen so
+    // we can't massively over-estimate the buffer size.
+    int matches_possible = 0;
+    char c = pattern.ptr[0];
+    for (int i = 0; i <= haystack.len - pattern.len; ++i) {
+      if (haystack.ptr[i] == c) ++matches_possible;
+    }
+    buffer_space = haystack.len + matches_possible * delta;
+  } else {
+    // Note - cannot overflow because pattern.len is at least one
+    static_assert(StringVal::MAX_LENGTH - 1 + StringVal::MAX_LENGTH <=
+        std::numeric_limits<decltype(buffer_space)>::max(),
+        "Buffer space computation can overflow");
+    buffer_space = haystack.len + delta;
+  }
+
+  StringVal result(context, buffer_space);
+  // If the result went over MAX_LENGTH, we can get a null result back
+  if (UNLIKELY(result.is_null)) return result;
+
+  uint8_t* ptr = result.ptr;
+  int consumed = 0;
+  while (match_pos + pattern.len <= haystack.len) {
+    // Copy in original string
+    const int unmatched_bytes = match_pos - consumed;
+    memcpy(ptr, &haystack.ptr[consumed], unmatched_bytes);
+    DCHECK_LE(ptr - result.ptr + unmatched_bytes, buffer_space);
+    ptr += unmatched_bytes;
+
+    // Copy in replacement - always safe since we always leave room for one more replace
+    DCHECK_LE(ptr - result.ptr + replace.len, buffer_space);
+    memcpy(ptr, replace.ptr, replace.len);
+    ptr += replace.len;
+
+    // Don't want to re-match within already replaced pattern
+    match_pos += pattern.len;
+    consumed = match_pos;
+
+    StringValue haystack_substring = haystack.Substring(match_pos);
+    int match_pos_in_substring = search_ptr->Search(&haystack_substring);
+    if (match_pos_in_substring < 0) break;
+
+    match_pos += match_pos_in_substring;
+
+    // If we had an enlarging pattern, we may need more space
+    if (delta > 0) {
+      const int bytes_produced = ptr - result.ptr;
+      const int bytes_remaining = haystack.len - consumed;
+      DCHECK_LE(bytes_produced, StringVal::MAX_LENGTH);
+      DCHECK_LE(bytes_remaining, StringVal::MAX_LENGTH - 1);
+      // Note: by above, cannot overflow
+      const int min_output = bytes_produced + bytes_remaining;
+      DCHECK_LE(min_output, StringVal::MAX_LENGTH);
+      // Also no overflow: min_output <= MAX_LENGTH and delta <= MAX_LENGTH - 1
+      const int64_t space_needed = min_output + delta;
+      if (UNLIKELY(space_needed > buffer_space)) {
+        // Double at smaller sizes, but don't grow more than a megabyte a
+        // time at larger sizes.  Reasoning: let the allocator do its job
+        // and don't depend on policy here.
+        static_assert(StringVal::MAX_LENGTH % (1 << 20) == 0,
+            "Math requires StringVal::MAX_LENGTH to be a multiple of 1MB");
+        // Must compute next power of two using 64-bit math to avoid signed overflow
+        // The following DCHECK was supposed to be a static assertion, but C++11 is
+        // broken and doesn't declare std::min or std::max to be constexpr.  Fix this
+        // when eventually the minimum supported standard is raised to at least C++14
+        DCHECK_EQ(static_cast<int>(std::min<int64_t>(
+            BitUtil::RoundUpToPowerOfTwo(StringVal::MAX_LENGTH+1),
+            StringVal::MAX_LENGTH + (1 << 20))),
+            StringVal::MAX_LENGTH + (1 << 20));
+        buffer_space = static_cast<int>(std::min<int64_t>(
+            BitUtil::RoundUpToPowerOfTwo(space_needed),
+            space_needed + (1 << 20)));
+        if (UNLIKELY(!result.Resize(context, buffer_space))) return StringVal::null();
+        // Don't forget to move the pointer
+        ptr = result.ptr + bytes_produced;
+      }
+    }
+  }
+
+  // Copy in remainder and re-adjust size
+  const int bytes_remaining = haystack.len - consumed;
+  result.len = ptr - result.ptr + bytes_remaining;
+  DCHECK_LE(result.len, buffer_space);
+  memcpy(ptr, &haystack.ptr[consumed], bytes_remaining);
+
+  return result;
+}
+
 StringVal StringFunctions::Reverse(FunctionContext* context, const StringVal& str) {
   if (str.is_null) return StringVal::null();
   StringVal result(context, str.len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/exprs/string-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h
index fb3a900..3dad5f6 100644
--- a/be/src/exprs/string-functions.h
+++ b/be/src/exprs/string-functions.h
@@ -53,6 +53,10 @@ class StringFunctions {
   static StringVal Lower(FunctionContext*, const StringVal& str);
   static StringVal Upper(FunctionContext*, const StringVal& str);
   static StringVal InitCap(FunctionContext*, const StringVal& str);
+  static void ReplacePrepare(FunctionContext*, FunctionContext::FunctionStateScope);
+  static void ReplaceClose(FunctionContext*, FunctionContext::FunctionStateScope);
+  static StringVal Replace(FunctionContext*, const StringVal& str,
+      const StringVal& pattern, const StringVal& replace);
   static StringVal Reverse(FunctionContext*, const StringVal& str);
   static StringVal Translate(FunctionContext*, const StringVal& str, const StringVal& src,
       const StringVal& dst);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/gutil/bits.h
----------------------------------------------------------------------
diff --git a/be/src/gutil/bits.h b/be/src/gutil/bits.h
index 579d954..bce7f69 100644
--- a/be/src/gutil/bits.h
+++ b/be/src/gutil/bits.h
@@ -4,9 +4,6 @@
 
 #include "gutil/basictypes.h"
 #include "gutil/integral_types.h"
-#include <glog/logging.h>
-#include "gutil/logging-inl.h"
-#include "gutil/macros.h"
 
 #ifndef _BITS_H_
 #define _BITS_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 907bb22..dc8b0b5 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -92,6 +92,11 @@ class FunctionContextImpl {
   /// TODO: free them at the batch level and save some copies?
   uint8_t* AllocateLocal(int64_t byte_size) noexcept;
 
+  /// Resize a local allocation.
+  /// If the new allocation causes the memory limit to be exceeded, the error will be set
+  /// in this object causing the query to fail.
+  uint8_t* ReallocateLocal(uint8_t* ptr, int64_t byte_size) noexcept;
+
   /// Frees all allocations returned by AllocateLocal().
   void FreeLocalAllocations() noexcept;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/udf/udf-test-harness.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-test-harness.cc b/be/src/udf/udf-test-harness.cc
index d34b5ee..46cbbe0 100644
--- a/be/src/udf/udf-test-harness.cc
+++ b/be/src/udf/udf-test-harness.cc
@@ -28,8 +28,9 @@ using namespace impala;
 
 FunctionContext* UdfTestHarness::CreateTestContext(
     const FunctionContext::TypeDesc& return_type,
-    const vector<FunctionContext::TypeDesc>& arg_types, RuntimeState* state) {
-  return FunctionContextImpl::CreateContext(state, NULL, return_type, arg_types, 0, true);
+    const vector<FunctionContext::TypeDesc>& arg_types, RuntimeState* state,
+    MemPool* pool) {
+  return FunctionContextImpl::CreateContext(state, pool, return_type, arg_types, 0, true);
 }
 
 void UdfTestHarness::SetConstantArgs(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/udf/udf-test-harness.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-test-harness.h b/be/src/udf/udf-test-harness.h
index 668a44d..f730d98 100644
--- a/be/src/udf/udf-test-harness.h
+++ b/be/src/udf/udf-test-harness.h
@@ -28,6 +28,8 @@
 #include "udf/udf.h"
 #include "udf/udf-debug.h"
 
+namespace impala { class MemPool; }
+
 namespace impala_udf {
 
 /// Utility class to help test UDFs.
@@ -38,7 +40,7 @@ class UdfTestHarness {
   /// for calling delete on it. This context has additional debugging validation enabled.
   static FunctionContext* CreateTestContext(const FunctionContext::TypeDesc& return_type,
       const std::vector<FunctionContext::TypeDesc>& arg_types,
-      impala::RuntimeState* state = nullptr);
+      impala::RuntimeState* state = nullptr, impala::MemPool* pool = nullptr);
 
   /// Use with test contexts to test use of IsArgConstant() and GetConstantArg().
   /// constant_args should contain an AnyVal* for each argument of the UDF not including

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index c2a5270..a013638 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -17,6 +17,7 @@
 
 #include "udf/udf.h"
 
+#include <algorithm>
 #include <iostream>
 #include <sstream>
 #include <assert.h>
@@ -449,6 +450,29 @@ uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept {
   return buffer;
 }
 
+uint8_t* FunctionContextImpl::ReallocateLocal(uint8_t* ptr, int64_t byte_size) noexcept {
+  assert(!closed_);
+  uint8_t* new_ptr  = pool_->Reallocate(ptr, byte_size);
+  if (UNLIKELY(!CheckAllocResult("FunctionContextImpl::ReallocateLocal",
+      new_ptr, byte_size))) {
+    return NULL;
+  }
+  if (new_ptr != ptr) {
+    auto v = std::find(local_allocations_.rbegin(), local_allocations_.rend(), ptr);
+    assert(v != local_allocations_.rend());
+    // Avoid perf issue; move to end of local allocations on any reallocation and
+    // always start the search from there.
+    if (v != local_allocations_.rbegin()) {
+      *v = *local_allocations_.rbegin();
+    }
+    *local_allocations_.rbegin() = new_ptr;
+  }
+  VLOG_ROW << "Reallocate Local: FunctionContext=" << context_
+           << " ptr=" << reinterpret_cast<void*>(ptr) << " size=" << byte_size
+           << " result=" << reinterpret_cast<void*>(new_ptr);
+  return new_ptr;
+}
+
 void FunctionContextImpl::FreeLocalAllocations() noexcept {
   assert(!closed_);
   if (VLOG_ROW_IS_ON) {
@@ -479,7 +503,6 @@ void FunctionContextImpl::SetNonConstantArgs(
 // expr-ir.cc. This could probably use further investigation.
 StringVal::StringVal(FunctionContext* context, int len) noexcept : len(len), ptr(NULL) {
   if (UNLIKELY(len > StringVal::MAX_LENGTH)) {
-    std::cout << "MAX_LENGTH, Trying to allocate " << len;
     context->SetError("String length larger than allowed limit of "
                       "1 GB character data.");
     len = 0;
@@ -504,6 +527,22 @@ StringVal StringVal::CopyFrom(FunctionContext* ctx, const uint8_t* buf, size_t l
   return result;
 }
 
+bool StringVal::Resize(FunctionContext* ctx, int new_len) noexcept {
+  if (UNLIKELY(new_len > StringVal::MAX_LENGTH)) {
+    ctx->SetError("String length larger than allowed limit of 1 GB character data.");
+    len = 0;
+    is_null = true;
+    return false;
+  }
+  auto* new_ptr = ctx->impl()->ReallocateLocal(ptr, new_len);
+  if (new_ptr != nullptr) {
+    ptr = new_ptr;
+    len = new_len;
+    return true;
+  }
+  return false;
+}
+
 // TODO: why doesn't libudasample.so build if this in udf-ir.cc?
 const FunctionContext::TypeDesc* FunctionContext::GetArgType(int arg_idx) const {
   if (arg_idx < 0 || arg_idx >= impl_->arg_types_.size()) return NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/be/src/udf/udf.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 4fdca2d..d4d6602 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -582,8 +582,22 @@ struct StringVal : public AnyVal {
   /// If the memory allocation fails, e.g. because the intermediate value would be too
   /// large, the constructor will construct a NULL string and set an error on the function
   /// context.
+  ///
+  /// The memory backing this StringVal is a local allocation, and so doesn't need
+  /// to be explicitly freed.
   StringVal(FunctionContext* context, int len) noexcept;
 
+  /// Reallocate a StringVal that is backed by a local allocation so that it as
+  /// at least as large as len.  May shrink or / expand the string.  If the
+  /// string is expanded, the content of the new space is undefined.
+  ///
+  /// If the resize fails, the original StringVal remains in place.  Callers do not
+  /// otherwise need to be concerned with backing storage, which is allocated from a
+  /// local allocation.
+  ///
+  /// Returns true on success, false on failure.
+  bool Resize(FunctionContext* context, int len) noexcept;
+
   /// Will create a new StringVal with the given dimension and copy the data from the
   /// parameters. In case of an error will return a NULL string and set an error on the
   /// function context.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/common/function-registry/impala_functions.py
----------------------------------------------------------------------
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index 2141a35..fbe6719 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -423,6 +423,9 @@ visible_functions = [
   [['lower', 'lcase'], 'STRING', ['STRING'], 'impala::StringFunctions::Lower'],
   [['upper', 'ucase'], 'STRING', ['STRING'], 'impala::StringFunctions::Upper'],
   [['initcap'], 'STRING', ['STRING'], 'impala::StringFunctions::InitCap'],
+  [['replace'], 'STRING', ['STRING', 'STRING', 'STRING'], 'impala::StringFunctions::Replace',
+   '_ZN6impala15StringFunctions14ReplacePrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE',
+   '_ZN6impala15StringFunctions12ReplaceCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'],
   [['reverse'], 'STRING', ['STRING'], 'impala::StringFunctions::Reverse'],
   [['translate'], 'STRING', ['STRING', 'STRING', 'STRING'],
    'impala::StringFunctions::Translate'],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index adef592..867850d 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -2633,9 +2633,11 @@ non_pred_expr ::=
   {: RESULT = e; :}
   | analytic_expr:e
   {: RESULT = e; :}
-  /* Since "IF", "TRUNCATE" are keywords, need to special case these functions */
+  /* Since "IF", "REPLACE", "TRUNCATE" are keywords, need to special case these functions */
   | KW_IF LPAREN expr_list:exprs RPAREN
   {: RESULT = new FunctionCallExpr("if", exprs); :}
+  | KW_REPLACE LPAREN expr_list:exprs RPAREN
+  {: RESULT = new FunctionCallExpr("replace", exprs); :}
   | KW_TRUNCATE LPAREN expr_list:exprs RPAREN
   {: RESULT = new FunctionCallExpr("truncate", exprs); :}
   | cast_expr:c

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 14cd036..d0176d2 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3027,7 +3027,7 @@ public class ParserTest extends FrontendTestBase {
         "       ^\n" +
         "Encountered: FROM\n" +
         "Expected: ALL, CASE, CAST, DEFAULT, DISTINCT, EXISTS, " +
-        "FALSE, IF, INTERVAL, NOT, NULL, " +
+        "FALSE, IF, INTERVAL, NOT, NULL, REPLACE, " +
         "STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER\n");
 
     // missing from
@@ -3054,7 +3054,7 @@ public class ParserTest extends FrontendTestBase {
         "                           ^\n" +
         "Encountered: EOF\n" +
         "Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " +
-        "IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n");
+        "IF, INTERVAL, NOT, NULL, REPLACE, TRUNCATE, TRUE, IDENTIFIER\n");
 
     // missing predicate in where clause (group by)
     ParserError("select c, b, c from t where group by a, b",
@@ -3063,7 +3063,7 @@ public class ParserTest extends FrontendTestBase {
         "                            ^\n" +
         "Encountered: GROUP\n" +
         "Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " +
-        "IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n");
+        "IF, INTERVAL, NOT, NULL, REPLACE, TRUNCATE, TRUE, IDENTIFIER\n");
 
     // unmatched string literal starting with "
     ParserError("select c, \"b, c from t",
@@ -3124,7 +3124,7 @@ public class ParserTest extends FrontendTestBase {
         "                             ^\n" +
         "Encountered: COMMA\n" +
         "Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " +
-        "IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n");
+        "IF, INTERVAL, NOT, NULL, REPLACE, TRUNCATE, TRUE, IDENTIFIER\n");
 
     // Parsing identifiers that have different names printed as EXPECTED
     ParserError("DROP DATA SRC foo",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/testdata/workloads/functional-query/queries/QueryTest/exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exprs.test b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
index d611884..fa3d225 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exprs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
@@ -1931,6 +1931,33 @@ CAUSED BY: InternalException: Could not compile regexp pattern: *
 Error: no argument for repetition operator: *
 ====
 ---- QUERY
+select sum(length(replace(y, x, 'bbbbbbbbbbb')))
+from (select cast(round(float_col) AS STRING) as x, string_col as y
+      from functional.alltypes) v;
+---- TYPES
+BIGINT
+---- RESULTS
+43800
+====
+---- QUERY
+select sum(length(replace(y, '0', x)))
+from (select cast(round(float_col) AS STRING) as x, string_col as y
+      from functional.alltypes) v;
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+select sum(length(concat(replace(y, '0', x), replace(y, '0', x))))
+from (select cast(round(float_col) AS STRING) as x, string_col as y
+      from functional.alltypes) v;
+---- TYPES
+BIGINT
+---- RESULTS
+14600
+====
+---- QUERY
 # Test for factorial operator
 select distinct int_col, int_col! from functional.alltypes order by 1
 ---- RESULTS

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0715a303/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
index 5c90869..4419953 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
@@ -164,6 +164,24 @@ select length(regexp_replace(s, '.', '++++++++')) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+select length(replace(s, ' ', '++++++++')) from (
+  select group_concat(l_comment, "!") s from (
+    select l_comment from tpch.lineitem union all
+    select l_comment from tpch.lineitem) t1
+  ) t2
+---- RESULTS
+625718301
+=====
+---- QUERY
+select replace(x, '+', '000') from (select (replace(s, ' ', '++++++++')) x from (
+  select group_concat(l_comment, "!") s from (
+    select l_comment from tpch.lineitem union all
+    select l_comment from tpch.lineitem) t1
+  ) t2) t3;
+---- CATCH
+String length larger than allowed limit of 1 GB character data
+=====
+---- QUERY
 select trunc(timestamp_col, space(1073741830)) from functional.alltypes
 ---- CATCH
 String length larger than allowed limit of 1 GB character data


[5/5] incubator-impala git commit: IMPALA-4840: Fix REFRESH performance regression.

Posted by lv...@apache.org.
IMPALA-4840: Fix REFRESH performance regression.

The fix for IMPALA-4172 introduced a regression in
performance of the REFRESH command. The regression
stems from the fact that we reload the block metadata
of every valid data file without considering whether it
has changed since the last load. This caused unnecessary
metadata loads for unchanged files and thus increasing
the runtime.

The fix involves having the refresh codepath (and other
operations that use the same codepath like insert etc.) to
reload the metadata of only modified files by doing a
listStatus() on the partition directory and checking the
last modified time of each file. Without this patch, we relied
on listFiles(), which fetched the block locations irrespective of
whether the file has changed and it was significantly slower on
unchanged tables. The initial/invalidate metadata load still
fetches the block locations in bulk using listFiles(). The
side effect of this change is that the refresh no longer picks up
block location changes after HDFS block rebalancing. We suggest
using "invalidate metadata" for that which loads the metadata from
scratch.

Additionally, this commit enables the reuse of metadata during
table refresh (which was disabled in IMPALA-4172) to prevent
reloading metadata from HMS everytime.

Change-Id: I859b9fe93563ba886d0b5db6db42a14c88caada8
Reviewed-on: http://gerrit.cloudera.org:8080/6009
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/26eaa266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/26eaa266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/26eaa266

Branch: refs/heads/master
Commit: 26eaa266092a5d8b37e21fd19dfbae81a952ac74
Parents: bd1d445
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Thu Feb 9 22:54:40 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 16 04:52:54 2017 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |   2 +-
 .../org/apache/impala/catalog/HdfsTable.java    | 140 +++++++++++++++----
 2 files changed, 110 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26eaa266/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 2c42874..8be0aa3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -974,7 +974,7 @@ public class CatalogServiceCatalog extends Catalog {
           throw new TableLoadingException("Error loading metadata for table: " +
               db.getName() + "." + tblName.getTable_name(), e);
         }
-        tbl.load(false, msClient.getHiveClient(), msTbl);
+        tbl.load(true, msClient.getHiveClient(), msTbl);
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26eaa266/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 795dae2..6096ba9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -81,8 +81,10 @@ import org.apache.impala.util.TResultRowBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -311,28 +313,10 @@ public class HdfsTable extends Table {
         FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
             fileStatus.getModificationTime());
         BlockLocation[] locations = fileStatus.getBlockLocations();
-        String partPathDirName = partPathDir.toString();
-        for (BlockLocation loc: locations) {
-          Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
-          // Enumerate all replicas of the block, adding any unknown hosts
-          // to hostIndex_. We pick the network address from getNames() and
-          // map it to the corresponding hostname from getHosts().
-          List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
-              loc.getNames().length);
-          for (int i = 0; i < loc.getNames().length; ++i) {
-            TNetworkAddress networkAddress =
-                BlockReplica.parseLocation(loc.getNames()[i]);
-            replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
-                cachedHosts.contains(loc.getHosts()[i])));
-          }
-          FileBlock currentBlock =
-              new FileBlock(loc.getOffset(), loc.getLength(), replicas);
-          THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
-          fd.addThriftFileBlock(tHdfsFileBlock);
-          unknownDiskIdCount += loadDiskIds(loc, tHdfsFileBlock);
-        }
+        unknownDiskIdCount += setFdBlockMetadata(fd, locations);
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Adding file md dir: " + partPathDirName + " file: " + fileName);
+          LOG.trace("Adding file md dir: " + partPathDir.toString() + " file: " +
+              fileName);
         }
         // Update the partitions' metadata that this file belongs to.
         for (HdfsPartition partition: partitions) {
@@ -354,6 +338,35 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Sets the block metadata for FileDescriptor 'fd' using block location metadata
+   * from 'locations'.
+   */
+  private int setFdBlockMetadata(FileDescriptor fd, BlockLocation[] locations)
+      throws IOException {
+    int unknownFdDiskIds = 0;
+    for (BlockLocation loc: locations) {
+      Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
+      // Enumerate all replicas of the block, adding any unknown hosts
+      // to hostIndex_. We pick the network address from getNames() and
+      // map it to the corresponding hostname from getHosts().
+      List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
+          loc.getNames().length);
+      for (int i = 0; i < loc.getNames().length; ++i) {
+        TNetworkAddress networkAddress =
+            BlockReplica.parseLocation(loc.getNames()[i]);
+        replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
+            cachedHosts.contains(loc.getHosts()[i])));
+      }
+      FileBlock currentBlock =
+          new FileBlock(loc.getOffset(), loc.getLength(), replicas);
+      THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
+      fd.addThriftFileBlock(tHdfsFileBlock);
+      unknownFdDiskIds += loadDiskIds(loc, tHdfsFileBlock);
+    }
+    return unknownFdDiskIds;
+  }
+
+  /**
    * Loads the disk IDs for BlockLocation 'location' and its corresponding file block.
    * HDFS API for BlockLocation returns a storageID UUID string for each disk
    * hosting the block, which is then mapped to a 0-based integer id called disk ID.
@@ -388,6 +401,20 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Synthesize the block metadata for a given HdfsPartition object. Should only
+   * be called for FileSystems that do not support storage IDs.
+   */
+  private void synthesizeBlockMetadata(FileSystem fs, HdfsPartition partition)
+      throws IOException {
+    Preconditions.checkState(!FileSystemUtil.supportsStorageIds(fs));
+    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+    Path partitionPath = partition.getLocationPath();
+    partition.setFileDescriptors(new ArrayList<FileDescriptor>());
+    partsByPath.put(partitionPath, Lists.newArrayList(partition));
+    synthesizeBlockMetadata(fs, partitionPath, partsByPath);
+  }
+
+  /**
    * For filesystems that don't support BlockLocation API, synthesize file blocks
    * by manually splitting the file range into fixed-size blocks.  That way, scan
    * ranges can be derived from file blocks as usual.  All synthesized blocks are given
@@ -755,11 +782,62 @@ public class HdfsTable extends Table {
     loadMetadataAndDiskIds(dirsToLoad, partsByPath);
   }
 
-  private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException {
-    Path partDirPath = partition.getLocationPath();
-    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    partsByPath.put(partDirPath, Lists.newArrayList(partition));
-    loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath);
+  /**
+   * Refreshes block metadata information for 'partition'. This method is optimized for
+   * the case where the files in the partition have not changed dramatically. It first
+   * uses a listStatus() call on the partition directory to detect files with changed
+   * mtime and fetches their block locations using the getFileBlockLocations() method.
+   * Our benchmarks suggest that the listStatus() call is much faster then the listFiles()
+   * (up to ~40x faster in some cases). The initial table load still uses the listFiles()
+   * on the data directory that fetches both the FileStatus as well as BlockLocations in
+   * a single call.
+   */
+  private void refreshFileMetadata(HdfsPartition partition) throws CatalogException {
+    Path partDir = partition.getLocationPath();
+    Preconditions.checkNotNull(partDir);
+    try {
+      FileSystem fs = partDir.getFileSystem(CONF);
+      if (!fs.exists(partDir)) {
+        partition.setFileDescriptors(new ArrayList<FileDescriptor>());
+        return;
+      }
+      if (!FileSystemUtil.supportsStorageIds(fs)) {
+        synthesizeBlockMetadata(fs, partition);
+        return;
+      }
+      // Index the partition file descriptors by their file names for O(1) look ups.
+      ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
+          partition.getFileDescriptors(), new Function<FileDescriptor, String>() {
+            public String apply(FileDescriptor desc) {
+              return desc.getFileName();
+            }
+          });
+      // Iterate through the current snapshot of the partition directory listing to
+      // figure out files that were newly added/modified.
+      List<FileDescriptor> newFileDescs = Lists.newArrayList();
+      int newPartSizeBytes = 0;
+      for (FileStatus fileStatus : fs.listStatus(partDir)) {
+        if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
+        String fileName = fileStatus.getPath().getName().toString();
+        FileDescriptor fd = fileDescsByName.get(fileName);
+        if (fd == null || partition.isMarkedCached() ||
+            fd.getFileLength() != fileStatus.getLen() ||
+            fd.getModificationTime() != fileStatus.getModificationTime()) {
+          fd = new FileDescriptor(fileName, fileStatus.getLen(),
+              fileStatus.getModificationTime());
+          setFdBlockMetadata(fd,
+              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()));
+        }
+        newFileDescs.add(fd);
+        newPartSizeBytes += fileStatus.getLen();
+      }
+      partition.setFileDescriptors(newFileDescs);
+      numHdfsFiles_ += newFileDescs.size();
+      totalHdfsBytes_ += newPartSizeBytes;
+    } catch(IOException e) {
+      throw new CatalogException("Error loading block metadata for partition " +
+          partition.toString(), e);
+    }
   }
 
   /**
@@ -772,7 +850,7 @@ public class HdfsTable extends Table {
     LOG.info(String.format(
         "Loading file and block metadata for %s partitions from %s paths: %s",
         partsByPath.size(), locations.size(), getFullName()));
-    for (Path location: locations) { loadBlockMetadata(location, partsByPath); }
+    for (Path location: locations) loadBlockMetadata(location, partsByPath);
     LOG.info(String.format(
         "Loaded file and block metadata for %s partitions from %s paths: %s",
         partsByPath.size(), locations.size(), getFullName()));
@@ -831,7 +909,7 @@ public class HdfsTable extends Table {
       org.apache.hadoop.hive.metastore.api.Partition msPartition)
       throws CatalogException {
     HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition);
-    loadMetadataAndDiskIds(hdfsPartition);
+    refreshFileMetadata(hdfsPartition);
     return hdfsPartition;
   }
 
@@ -1119,8 +1197,8 @@ public class HdfsTable extends Table {
     addDefaultPartition(msTbl.getSd());
     HdfsPartition part = createPartition(msTbl.getSd(), null);
     addPartition(part);
-    loadMetadataAndDiskIds(part);
     if (isMarkedCached_) part.markCached();
+    refreshFileMetadata(part);
   }
 
   /**
@@ -1436,7 +1514,7 @@ public class HdfsTable extends Table {
         // WRITE_ONLY the table's access level should be NONE.
         accessLevel_ = TAccessLevel.READ_ONLY;
       }
-      loadMetadataAndDiskIds(partition);
+      refreshFileMetadata(partition);
     }
   }
 
@@ -1492,7 +1570,7 @@ public class HdfsTable extends Table {
     numHdfsFiles_ -= partition.getNumFileDescriptors();
     totalHdfsBytes_ -= partition.getSize();
     Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0);
-    loadMetadataAndDiskIds(partition);
+    refreshFileMetadata(partition);
   }
 
   @Override


[3/5] incubator-impala git commit: IMPALA-4916: Fix maintenance of set of item sets in DisjointSet.

Posted by lv...@apache.org.
IMPALA-4916: Fix maintenance of set of item sets in DisjointSet.

The bug: The DisjointSet maintains a set of unique item sets
using a HashSet<Set<T>>. The problem is that we modified
the Set<T> elements after inserting them into the HashSet.
This caused the removal of elements from the HashSet to fail.
Removal is required for maintaining a consistent DisjointSet.
The removal could even fail for the same Set<T> instance because
the hashCode() changed from when the Set<T> was originally
inserted to when the removal was attempted due to mutation
of the Set<T>.
An inconsistent DisjointSet can lead to incorrect equivalence
classes, which can lead to missing, redundant and even
non-executable predicates. Incorrect results and crashes are
possible.

For most queries, an inconsistent DisjointSet does not alter
the equivalence classes, and even fewer queries have incorrect
plans.
In fact, many of our existing planner tests trigger this bug,
but only 3 of them lead to an incorrect value transfer graph,
and all 3 had correct plans.

The fix: Use an IdentityHashMap to store the set of item sets.
It does not rely on the hashCode() and equals() of the stored
elements, so the same object can be added and later removed,
even when mutated in the meantime.

Testing:
- Added a Preconditions check in DisjointSet that asserts
  correct removal of an item set. Many of our existing tests
  hit the check before this fix.
- Added a new unit test for DisjointSet which triggers
  the bug.
- Augmented DisjointSet.checkConsistency() to check for
  inconsistency in the set of item sets.
- Added validation of the value-transfer graph in
  single-node planner tests.
- A private core/hdfs run succeeded.

Change-Id: I609c8795c09becd78815605ea8e82e2f99e82212
Reviewed-on: http://gerrit.cloudera.org:8080/5980
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ffd297bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ffd297bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ffd297bd

Branch: refs/heads/master
Commit: ffd297bdea5d58db3d1791a2528b7f1009a459c6
Parents: cec3fe3
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Feb 10 07:05:07 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 15 22:16:30 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/Analyzer.java    |  9 ++++-
 .../main/java/org/apache/impala/common/Id.java  |  4 +--
 .../org/apache/impala/util/DisjointSet.java     | 37 +++++++++++++++++---
 .../org/apache/impala/planner/PlannerTest.java  |  2 +-
 .../org/apache/impala/util/TestDisjointSet.java | 17 +++++++++
 .../queries/PlannerTest/joins.test              | 15 ++++----
 6 files changed, 65 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ffd297bd/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 34cf565..2809851 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -56,6 +56,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
 import org.apache.impala.rewrite.ExprRewriter;
@@ -1979,6 +1980,12 @@ public class Analyzer {
     globalState_.valueTransferGraph = new ValueTransferGraph();
     globalState_.valueTransferGraph.computeValueTransfers();
 
+    // Validate the value-transfer graph in single-node planner tests.
+    if (RuntimeEnv.INSTANCE.isTestEnv() && getQueryOptions().num_nodes == 1) {
+      Preconditions.checkState(validateValueTransferGraph(),
+          "Failed to validate value-transfer graph.");
+    }
+
     // we start out by assigning each slot to its own equiv class
     int numSlots = globalState_.descTbl.getMaxSlotId().asInt() + 1;
     for (int i = 0; i < numSlots; ++i) {
@@ -2941,7 +2948,7 @@ public class Analyzer {
       for (Pair<SlotId, SlotId> vt: valueTransfers) {
         expectedValueTransfer[vt.first.asInt()][vt.second.asInt()] = true;
       }
-      // Set registered value tranfers in expectedValueTransfer.
+      // Set registered value transfers in expectedValueTransfer.
       for (Pair<SlotId, SlotId> vt: globalState_.registeredValueTransfers) {
         expectedValueTransfer[vt.first.asInt()][vt.second.asInt()] = true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ffd297bd/fe/src/main/java/org/apache/impala/common/Id.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/Id.java b/fe/src/main/java/org/apache/impala/common/Id.java
index c7357aa..b89b56d 100644
--- a/fe/src/main/java/org/apache/impala/common/Id.java
+++ b/fe/src/main/java/org/apache/impala/common/Id.java
@@ -36,9 +36,7 @@ public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
   public int asInt() { return id_; }
 
   @Override
-  public int hashCode() {
-    return Integer.valueOf(id_).hashCode();
-  }
+  public int hashCode() { return id_; }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ffd297bd/fe/src/main/java/org/apache/impala/util/DisjointSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/DisjointSet.java b/fe/src/main/java/org/apache/impala/util/DisjointSet.java
index d5543b1..1665cc7 100644
--- a/fe/src/main/java/org/apache/impala/util/DisjointSet.java
+++ b/fe/src/main/java/org/apache/impala/util/DisjointSet.java
@@ -18,10 +18,12 @@
 package org.apache.impala.util;
 
 import java.util.Collection;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -35,7 +37,18 @@ import com.google.common.collect.Sets;
 public class DisjointSet<T> {
   // Maps from an item to its item set.
   private final Map<T, Set<T>> itemSets_ = Maps.newHashMap();
-  private final Set<Set<T>> uniqueSets_ = Sets.newHashSet();
+
+  // Stores the set of item sets in its keySet(). All keys are mapped to the same
+  // dummy value which is only used for validating the removal of entries.
+  // An IdentityHashMap is needed here instead of a regular HashSet because the Set<T>
+  // elements are mutated after inserting them. In a conventional HashSet mutating
+  // elements after they are inserted may cause problems because the hashCode() of the
+  // object during insertion may be different than its hashCode() after mutation. So,
+  // after changing the element it may not be possible to remove/find it again in the
+  // HashSet (looking in the wrong hash bucket), even using the same object reference.
+  private final IdentityHashMap<Set<T>, Object> uniqueSets_ =
+      new IdentityHashMap<Set<T>, Object>();
+  private static final Object DUMMY_VALUE = new Object();
 
   /**
    * Returns the item set corresponding to the given item or null if it
@@ -43,7 +56,7 @@ public class DisjointSet<T> {
    */
   public Set<T> get(T item) { return itemSets_.get(item); }
 
-  public Set<Set<T>> getSets() { return uniqueSets_; }
+  public Set<Set<T>> getSets() { return uniqueSets_.keySet(); }
 
   /**
    * Registers a new item set with a single item. Returns the new item set.
@@ -56,7 +69,7 @@ public class DisjointSet<T> {
     }
     Set<T> s = Sets.newHashSet(item);
     itemSets_.put(item, s);
-    uniqueSets_.add(s);
+    uniqueSets_.put(s, DUMMY_VALUE);
     return s;
   }
 
@@ -94,7 +107,8 @@ public class DisjointSet<T> {
       mergedItems.add(item);
       itemSets_.put(item, mergedItems);
     }
-    uniqueSets_.remove(updateItems);
+    Object removedValue = uniqueSets_.remove(updateItems);
+    Preconditions.checkState(removedValue == DUMMY_VALUE);
     return true;
   }
 
@@ -125,6 +139,7 @@ public class DisjointSet<T> {
    * Throws an IllegalStateException if an inconsistency is detected.
    */
   public void checkConsistency() {
+    // Validate map from item to item set.
     Set<Set<T>> validatedSets = Sets.newHashSet();
     for (Set<T> itemSet: itemSets_.values()) {
       // Avoid checking the same item set multiple times.
@@ -133,10 +148,22 @@ public class DisjointSet<T> {
       // the set itself.
       for (T item: itemSet) {
         if (itemSet != itemSets_.get(item)) {
-          throw new IllegalStateException("DisjointSet is in an inconsistent state.");
+          throw new IllegalStateException(
+              "DisjointSet is in an inconsistent state. Failed item set validation.");
         }
       }
       validatedSets.add(itemSet);
     }
+
+    // Validate set of item sets. Every element should appear in exactly one item set.
+    Set<T> seenItems = Sets.newHashSet();
+    for (Set<T> itemSet: uniqueSets_.keySet()) {
+      for (T item: itemSet) {
+        if (!seenItems.add(item)) {
+          throw new IllegalStateException(
+              "DisjointSet is in an inconsistent state. Failed unique set validation.");
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ffd297bd/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 8eabe8a..a88a3cd 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -31,7 +31,7 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 // All planner tests, except for S3 specific tests should go here.
 public class PlannerTest extends PlannerTestBase {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ffd297bd/fe/src/test/java/org/apache/impala/util/TestDisjointSet.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestDisjointSet.java b/fe/src/test/java/org/apache/impala/util/TestDisjointSet.java
index a932dfc..0dc290e 100644
--- a/fe/src/test/java/org/apache/impala/util/TestDisjointSet.java
+++ b/fe/src/test/java/org/apache/impala/util/TestDisjointSet.java
@@ -157,4 +157,21 @@ public class TestDisjointSet {
         && ds.get(1).containsAll(Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8)));
     ds.checkConsistency();
   }
+
+  /**
+   * IMPALA-4916: Tests that the set of item sets is maintained correctly.
+   */
+  @Test
+  public void testUniqueSets() throws Exception {
+    DisjointSet<Integer> ds = new DisjointSet<Integer>();
+
+    int uniqueElements = 100;
+    // Generate several 2-element item sets.
+    for (int i = 0; i < uniqueElements; i += 2) ds.union(i, i + 1);
+    // Connect several item sets into one big item set. This stresses the logic
+    // of maintaining the set of item sets.
+    for (int i = uniqueElements/2; i < uniqueElements; ++i) ds.union(i, i + 1);
+
+    ds.checkConsistency();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ffd297bd/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 8161833..f28ecb0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -1171,12 +1171,7 @@ PLAN-ROOT SINK
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> a.id, RF001 -> a.int_col
 ====
-# Test correct removal of redundant join predicates (IMPALA-1353):
-# No predicates are removed from the On-clause. The resulting plan is correct
-# but inconsistent with behavior of Impala behavior for other queries.
-# TODO: Investigate why the equivalence class computation appears to not
-# work for the slots of tables a and b. We expect predicates such as
-# 'id+id = tinyint_col' to be assigned in the union operands.
+# IMPALA-1353/IMPALA-4916: Test correct removal of redundant join predicates.
 select 1
 from functional.alltypes a
 inner join
@@ -1191,20 +1186,22 @@ on a.id = b.x and a.id = b.tinyint_col and
 PLAN-ROOT SINK
 |
 04:HASH JOIN [INNER JOIN]
-|  hash predicates: a.id = tinyint_col, a.id = x, a.int_col = bigint_col, a.int_col = y
-|  runtime filters: RF000 <- tinyint_col, RF001 <- x, RF002 <- bigint_col, RF003 <- y
+|  hash predicates: a.id = x, a.int_col = y
+|  runtime filters: RF000 <- x, RF001 <- y
 |
 |--01:UNION
 |  |
 |  |--03:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
+|  |     predicates: id - id = tinyint_col, int_col / int_col = bigint_col
 |  |
 |  02:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
+|     predicates: int_col * int_col = bigint_col, id + id = tinyint_col
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
-   runtime filters: RF000 -> a.id, RF001 -> a.id, RF002 -> a.int_col, RF003 -> a.int_col
+   runtime filters: RF000 -> a.id, RF001 -> a.int_col
 ====
 # Test creation of predicates at a join node for constructing the
 # minimum spanning tree to cover known slot equivalences (IMPALA-1102).


[2/5] incubator-impala git commit: IMPALA-4809: Enable support for DECIMAL_V2 in decimal_casting.py

Posted by lv...@apache.org.
IMPALA-4809: Enable support for DECIMAL_V2 in decimal_casting.py

This change enables decimal_v2 for casting to decimal except
for cases which involve rounding/truncation. In which case,
we only enable decimal_v2 for casting from numbers to decimal.
Due to prior misunderstanding, some of the changes made in
commit 2088cf2 weren't necessary. By default, Impala interprets
floating numbers as decimal unless they exceed the maximum
decimal type supported. This change also removess a minor quirk:
previously, the test will revert to using string to decimal
casting for numbers larger than 2^63-1. This is unnecessary as
Impala is able to treat numbers larger than that bound as
decimal type.

Change-Id: Icf2c8c9d360ad92cbdc5ce902ee742ec4408a8a3
Reviewed-on: http://gerrit.cloudera.org:8080/5989
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cec3fe35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cec3fe35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cec3fe35

Branch: refs/heads/master
Commit: cec3fe35740ed76065ac60f0d28ea3ca98c5e600
Parents: 0715a30
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Feb 13 15:11:14 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 15 10:12:58 2017 +0000

----------------------------------------------------------------------
 tests/query_test/test_decimal_casting.py | 63 ++++++++++++---------------
 1 file changed, 27 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cec3fe35/tests/query_test/test_decimal_casting.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_decimal_casting.py b/tests/query_test/test_decimal_casting.py
index fac49c5..ce8037d 100644
--- a/tests/query_test/test_decimal_casting.py
+++ b/tests/query_test/test_decimal_casting.py
@@ -18,7 +18,7 @@
 # Validates that casting to Decimal works.
 #
 import pytest
-from decimal import Decimal, getcontext, ROUND_DOWN
+from decimal import Decimal, getcontext, ROUND_DOWN, ROUND_HALF_UP
 from metacomm.combinatorics.all_pairs2 import all_pairs2 as all_pairs
 from random import randint
 
@@ -31,8 +31,9 @@ class TestDecimalCasting(ImpalaTestSuite):
 
   Specifically, this test suite ensures that:
     - overflows and underflows and handled correctly.
-    - casts from floats/string to their exact decimal types are correct.
+    - casts from decimal/string to their exact decimal types are correct.
     - max/min/NULL/0 can be expressed with their respective decimal types.
+    - TODO: Add cases for cast from float/double to decimal types.
   """
   DECIMAL_TYPES_MAP = {
       # All possible decimal types.
@@ -43,9 +44,9 @@ class TestDecimalCasting(ImpalaTestSuite):
       # mimics test_vectors.py and takes a subset of all decimal types
       'pairwise' : all_pairs([(p, s) for p in xrange(1, 39) for s in xrange(0, p + 1)])
   }
-  # We can cast for numerics, string or decimal types.
-  CAST_FROM = ['string', 'number', 'decimal']
-  # Set the default precisin to 38 to operate on decimal values.
+  # We can cast for numerics or string types.
+  CAST_FROM = ['string', 'number']
+  # Set the default precision to 38 to operate on decimal values.
   getcontext().prec = 38
   # Represents a 0 in decimal
   DECIMAL_ZERO = Decimal('0')
@@ -62,12 +63,9 @@ class TestDecimalCasting(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('cast_from', *TestDecimalCasting.CAST_FROM))
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict(
-        {'decimal_v2': ['false']}))
+        {'decimal_v2': ['false','true']}))
     cls.iterations = 1
 
-  def setup_method(self, method):
-    self.max_bigint = int(self.execute_scalar("select max_bigint()"))
-
   def _gen_decimal_val(self, precision, scale):
     """Generates a Decimal object with the exact number of digits as the precision."""
     # Generates numeric string which has as many digits as the precision.
@@ -79,22 +77,14 @@ class TestDecimalCasting(ImpalaTestSuite):
     return Decimal(num) if randint(0,1) else Decimal("-{0}".format(num))
 
   def _assert_decimal_result(self, cast, actual, expected):
-    assert actual == expected, "Cast: {0}, Expected: {1}, Actual: {2}".format(cast,\
+    assert expected == actual, "Cast: {0}, Expected: {1}, Actual: {2}".format(cast,\
         expected, actual)
 
-  def _normalize_cast_expr(self, decimal_val, scale, cast_from):
-    """Convert the decimal value to a string litetal to avoid overflow.
-
-    If an integer literal is greater than the max bigint supported by Impala, it
-    overflows. This methods replaces it with a string literal.
-    """
-    # Decimal({1},{2}) is the target type to cast to. If casting from decimal,
-    # Decimal({3},{4}) is the intermediate type to cast {0} to.
-    if (scale == 0 and abs(decimal_val) > self.max_bigint) or (cast_from == 'string'):
+  def _normalize_cast_expr(self, decimal_val, precision, cast_from):
+    # Due to IMPALA-4936, casts from double which overflows decimal type don't work
+    # reliably. So casting from string for now until IMPALA-4936 is fixed.
+    if precision > 38 or cast_from == 'string':
       return "select cast('{0}' as Decimal({1},{2}))"
-    elif cast_from == 'decimal':
-      base_cast = "cast('{0}' as Decimal({3},{4}))"
-      return "select cast(" + base_cast + " as Decimal({1},{2}))"
     else:
       return "select cast({0} as Decimal({1},{2}))"
 
@@ -110,18 +100,16 @@ class TestDecimalCasting(ImpalaTestSuite):
     dec_max = Decimal('{0}.{1}'.format('9' * (precision - scale), '9' * scale))
     # Multiplying large values eith -1 can produce an overflow.
     dec_min = Decimal('-{0}'.format(str(dec_max)))
-    cast = self._normalize_cast_expr(dec_max, scale, vector.get_value('cast_from'))
+    cast = self._normalize_cast_expr(dec_max, precision, vector.get_value('cast_from'))
     # Test max
-    res = Decimal(self.execute_scalar(\
-        cast.format(dec_max, precision, scale, precision, scale)))
+    res = Decimal(self.execute_scalar(cast.format(dec_max, precision, scale)))
     self._assert_decimal_result(cast, res, dec_max)
     # Test Min
-    res = Decimal(self.execute_scalar(\
-        cast.format(dec_min, precision, scale, precision, scale)))
+    res = Decimal(self.execute_scalar(cast.format(dec_min, precision, scale)))
     self._assert_decimal_result(cast, res, dec_min)
     # Test zero
     res = Decimal(self.execute_scalar(\
-        cast.format(TestDecimalCasting.DECIMAL_ZERO, precision, scale, precision, scale)))
+        cast.format(TestDecimalCasting.DECIMAL_ZERO, precision, scale)))
     self._assert_decimal_result(cast, res, TestDecimalCasting.DECIMAL_ZERO)
     # Test NULL
     null_cast = "select cast(NULL as Decimal({0}, {1}))".format(precision, scale)
@@ -136,8 +124,8 @@ class TestDecimalCasting(ImpalaTestSuite):
       pytest.skip("Casting between the same decimal type isn't interesting")
     for i in xrange(self.iterations):
       val = self._gen_decimal_val(precision, scale)
-      cast = self._normalize_cast_expr(val, scale, vector.get_value('cast_from'))\
-          .format(val, precision, scale, precision, scale)
+      cast = self._normalize_cast_expr(val, precision, vector.get_value('cast_from'))\
+          .format(val, precision, scale)
       res = Decimal(self.execute_scalar(cast))
       self._assert_decimal_result(cast, res, val)
 
@@ -149,8 +137,8 @@ class TestDecimalCasting(ImpalaTestSuite):
       # Generate a decimal with a larger precision than the one we're casting to.
       from_precision = randint(precision + 1, 39)
       val = self._gen_decimal_val(from_precision, scale)
-      cast = self._normalize_cast_expr(val, scale, vector.get_value('cast_from'))\
-          .format(val, precision, scale, min(38, from_precision), scale)
+      cast = self._normalize_cast_expr(val, from_precision,\
+          vector.get_value('cast_from')).format(val, precision, scale)
       res = self.execute_scalar(cast)
       self._assert_decimal_result(cast, res, 'NULL')
 
@@ -160,16 +148,19 @@ class TestDecimalCasting(ImpalaTestSuite):
     """
     precision, scale = vector.get_value('decimal_type')
     is_decimal_v2 = vector.get_value('exec_option')['decimal_v2'] == 'true'
+    cast_from = vector.get_value('cast_from')
+
     if precision == scale:
       pytest.skip("Cannot underflow scale when precision and scale are equal")
     for i in xrange(self.iterations):
       from_scale = randint(scale + 1, precision)
       val = self._gen_decimal_val(precision, from_scale)
-      cast = self._normalize_cast_expr(val, scale, vector.get_value('cast_from'))\
-          .format(val, precision, scale, precision, from_scale)
+      cast = self._normalize_cast_expr(val, precision, cast_from)\
+          .format(val, precision, scale)
       res = Decimal(self.execute_scalar(cast, vector.get_value('exec_option')))
-      if is_decimal_v2:
-        expected_val = val.quantize(Decimal('0e-%s' % scale))
+      # TODO: Remove check for cast_from once string to decimal is supported in decimal_v2.
+      if is_decimal_v2 and cast_from != 'string':
+        expected_val = val.quantize(Decimal('0e-%s' % scale), rounding=ROUND_HALF_UP)
       else:
         expected_val = val.quantize(Decimal('0e-%s' % scale), rounding=ROUND_DOWN)
       self._assert_decimal_result(cast, res, expected_val)


[4/5] incubator-impala git commit: Revert "IMPALA-4829: Change default Kudu read behavior for "RYW""

Posted by lv...@apache.org.
Revert "IMPALA-4829: Change default Kudu read behavior for "RYW""

Reverting until we have a fix for KUDU-1869:
Scans do not work with hybrid time disabled and snapshot
reads enabled

This reverts commit 32ff959814646458a34278500bd01fc7741951ce.

Change-Id: I995dec543946c9e0f79bc5b7e82568060a9d8262
Reviewed-on: http://gerrit.cloudera.org:8080/5970
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bd1d445b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bd1d445b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bd1d445b

Branch: refs/heads/master
Commit: bd1d445b37f3cfc56ff868a678caf161b29a9d92
Parents: ffd297b
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Fri Feb 10 14:21:32 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 15 22:45:50 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bd1d445b/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 8b6778f..d251eba 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -44,9 +44,8 @@ using kudu::client::KuduScanBatch;
 using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
 
-DEFINE_string(kudu_read_mode, "READ_AT_SNAPSHOT", "(Advanced) Sets the Kudu scan "
-    "ReadMode. Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Invalid "
-    "values result in using READ_AT_SNAPSHOT.");
+DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
+    "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT.");
 DEFINE_bool(pick_only_leaders_for_tests, false,
             "Whether to pick only leader replicas, for tests purposes only.");
 DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
@@ -57,7 +56,7 @@ DECLARE_int32(kudu_operation_timeout_ms);
 
 namespace impala {
 
-const string MODE_READ_LATEST = "READ_LATEST";
+const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 
 KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
@@ -138,9 +137,9 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
                          "Could not set replica selection.");
   }
   kudu::client::KuduScanner::ReadMode mode =
-      MODE_READ_LATEST == FLAGS_kudu_read_mode ?
-          kudu::client::KuduScanner::READ_LATEST :
-          kudu::client::KuduScanner::READ_AT_SNAPSHOT;
+      MODE_READ_AT_SNAPSHOT == FLAGS_kudu_read_mode ?
+          kudu::client::KuduScanner::READ_AT_SNAPSHOT :
+          kudu::client::KuduScanner::READ_LATEST;
   KUDU_RETURN_IF_ERROR(scanner_->SetReadMode(mode), "Could not set scanner ReadMode");
   KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),
       "Could not set scanner timeout");