You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/04/26 23:22:13 UTC

[01/10] incubator-impala git commit: IMPALA-4544: ASAN should ignore SEGV and leaks

Repository: incubator-impala
Updated Branches:
  refs/heads/master 58b206ff0 -> 6cddb952c


IMPALA-4544: ASAN should ignore SEGV and leaks

The environment variable ASAN_OPTIONS had always bee set to
"handle_segv=0 detect_leaks=0" on Jenkins ASAN jobs in the pre-Apache
era, but this had slipped through the cracks.

Change-Id: I055aa25d5ca41419bf3fc8a30a8fb8774ab05b18
Reviewed-on: http://gerrit.cloudera.org:8080/6600
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/72cf359f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/72cf359f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/72cf359f

Branch: refs/heads/master
Commit: 72cf359ff313ff4b79b76ac0089e00ed141c089b
Parents: 58b206f
Author: Jim Apple <jb...@apache.org>
Authored: Sat Apr 8 13:15:27 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Apr 22 20:46:09 2017 +0000

----------------------------------------------------------------------
 bin/run-backend-tests.sh | 1 +
 bin/start-catalogd.sh    | 1 +
 bin/start-impalad.sh     | 1 +
 bin/start-statestored.sh | 1 +
 4 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/72cf359f/bin/run-backend-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-backend-tests.sh b/bin/run-backend-tests.sh
index 726db73..74f457d 100755
--- a/bin/run-backend-tests.sh
+++ b/bin/run-backend-tests.sh
@@ -37,6 +37,7 @@ cd ${IMPALA_BE_DIR}
 cd ..
 
 export CTEST_OUTPUT_ON_FAILURE=1
+export ASAN_OPTIONS="handle_segv=0 detect_leaks=0"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/72cf359f/bin/start-catalogd.sh
----------------------------------------------------------------------
diff --git a/bin/start-catalogd.sh b/bin/start-catalogd.sh
index 18d2c6a..41da5df 100755
--- a/bin/start-catalogd.sh
+++ b/bin/start-catalogd.sh
@@ -70,6 +70,7 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
 fi
 
 . ${IMPALA_HOME}/bin/set-classpath.sh
+export ASAN_OPTIONS="handle_segv=0 detect_leaks=0"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/72cf359f/bin/start-impalad.sh
----------------------------------------------------------------------
diff --git a/bin/start-impalad.sh b/bin/start-impalad.sh
index a02b0ce..8eab812 100755
--- a/bin/start-impalad.sh
+++ b/bin/start-impalad.sh
@@ -103,6 +103,7 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
 fi
 
 . ${IMPALA_HOME}/bin/set-classpath.sh
+export ASAN_OPTIONS="handle_segv=0 detect_leaks=0"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/72cf359f/bin/start-statestored.sh
----------------------------------------------------------------------
diff --git a/bin/start-statestored.sh b/bin/start-statestored.sh
index 33f0dc1..d1eebca 100755
--- a/bin/start-statestored.sh
+++ b/bin/start-statestored.sh
@@ -59,6 +59,7 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
   fi
 fi
 
+export ASAN_OPTIONS="handle_segv=0 detect_leaks=0"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"


[05/10] incubator-impala git commit: IMPALA-3079: Fix sequence file writer

Posted by kw...@apache.org.
IMPALA-3079: Fix sequence file writer

This change fixes the following issues in the Sequence File Writer:
1. ReadWriteUtil::VLongRequiredBytes() and ReadWriteUtil::PutVLong()
   were broken. As a result, Impala created corrupt uncompressed
   sequence files.

2. KEY_CLASS_NAME was missing from the sequence file header. As a
   result, Hive could not read back uncompressed sequence files
   created by Impala.

3. Impala created record-compressed sequence files with empty keys
   block. As a result, Hive could not read back record-compressed
   sequence files created by Impala.

4. Impala created block-compressed files with:
   - empty key-lengths block
   - empty keys block
   - empty value-lengths block
   This resulted in invalid block-compressed sequence files that Hive could
   not read back.

5. In some cases the wrong Record-compression flag was written to the
   sequence file header. As a result, Hive could not read back record-
   compressed sequence files created by Impala.

6. Impala added 'sync_marker' instead of 'neg1_sync_marker' to the
   beginning of blocks in block-compressed sequence files. Hive could
   not read these files back.

7. The calculation of block sizes in SnappyBlockCompressor class was
   incorrect for odd-length buffers.

Change-Id: I0db642ad35132a9a5a6611810a6cafbbe26e7487
Reviewed-on: http://gerrit.cloudera.org:8080/6107
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: Attila Jeges <at...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59b2db6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59b2db6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59b2db6b

Branch: refs/heads/master
Commit: 59b2db6ba722e5bef297bb4603519e06333ce5cb
Parents: edcc593
Author: Attila Jeges <at...@cloudera.com>
Authored: Mon Feb 20 18:07:25 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 25 21:07:53 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-sequence-table-writer.cc       | 105 +++++++---
 be/src/exec/hdfs-sequence-table-writer.h        |  71 ++++++-
 be/src/exec/read-write-util-test.cc             |  57 ++++++
 be/src/exec/read-write-util.h                   |  64 +++++--
 be/src/util/compress.cc                         |  35 ++--
 be/src/util/decompress-test.cc                  |   9 +-
 .../queries/QueryTest/seq-writer.test           | 192 ++++++++++++++++++-
 tests/query_test/test_compressed_formats.py     |  39 +++-
 8 files changed, 495 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 3c522ba..9af7e0f 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -38,8 +38,10 @@
 
 namespace impala {
 
-uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
+const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
 const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
+const char* HdfsSequenceTableWriter::KEY_CLASS_NAME =
+    "org.apache.hadoop.io.BytesWritable";
 
 HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
                         RuntimeState* state, OutputPartition* output,
@@ -130,24 +132,25 @@ Status HdfsSequenceTableWriter::AppendRows(
 }
 
 Status HdfsSequenceTableWriter::WriteFileHeader() {
-  out_.WriteBytes(sizeof(SEQ6_CODE), reinterpret_cast<uint8_t*>(SEQ6_CODE));
+  out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE);
 
-  // Output an empty KeyClassName field
-  out_.WriteEmptyText();
+  // Setup to be correct key class
+  out_.WriteText(strlen(KEY_CLASS_NAME),
+      reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME));
 
   // Setup to be correct value class
   out_.WriteText(strlen(VALUE_CLASS_NAME),
-                 reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
+      reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
 
   // Flag for if compression is used
   out_.WriteBoolean(compress_flag_);
   // Only valid if compression is used. Indicates if block compression is used.
-  out_.WriteBoolean(!record_compression_);
+  out_.WriteBoolean(compress_flag_ && !record_compression_);
 
   // Output the name of our compression codec, parsed by readers
   if (compress_flag_) {
     out_.WriteText(codec_name_.size(),
-                   reinterpret_cast<const uint8_t*>(codec_name_.data()));
+        reinterpret_cast<const uint8_t*>(codec_name_.data()));
   }
 
   // Meta data is formated as an integer N followed by N*2 strings,
@@ -164,35 +167,63 @@ Status HdfsSequenceTableWriter::WriteFileHeader() {
 }
 
 Status HdfsSequenceTableWriter::WriteCompressedBlock() {
-  WriteStream header;
+  WriteStream record;
+  uint8_t *output;
+  int64_t output_length;
   DCHECK(compress_flag_);
 
-  // add a sync marker to start of the block
-  header.WriteBytes(sync_marker_.size(), sync_marker_.data());
+  // Add a sync marker to start of the block
+  record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
 
-  header.WriteVLong(unflushed_rows_);
+  // Output the number of rows in this block
+  record.WriteVLong(unflushed_rows_);
 
-  // Write Key Lengths and Key Values
-  header.WriteEmptyText();
-  header.WriteEmptyText();
+  // Output compressed key-lengths block-size & compressed key-lengths block.
+  // The key-lengths block contains byte value of 4 as a key length for each row (this is
+  // what Hive does).
+  string key_lengths_text(unflushed_rows_, '\x04');
+  {
+    SCOPED_TIMER(parent_->compress_timer());
+    RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(),
+        reinterpret_cast<uint8_t*>(&key_lengths_text[0]), &output_length, &output));
+  }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
 
-  // Output an Empty string for value Lengths
-  header.WriteEmptyText();
+  // Output compressed keys block-size & compressed keys block.
+  // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what
+  // Hive does).
+  string keys_text(unflushed_rows_ * 4, '\0');
+  {
+    SCOPED_TIMER(parent_->compress_timer());
+    RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(),
+        reinterpret_cast<uint8_t*>(&keys_text[0]), &output_length, &output));
+  }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
 
-  uint8_t *output;
-  int64_t output_length;
+  // Output compressed value-lengths block-size & compressed value-lengths block
+  string value_lengths_text = out_value_lengths_block_.String();
+  {
+    SCOPED_TIMER(parent_->compress_timer());
+    RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(),
+        reinterpret_cast<uint8_t*>(&value_lengths_text[0]), &output_length, &output));
+  }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
+
+  // Output compressed values block-size & compressed values block
   string text = out_.String();
   {
     SCOPED_TIMER(parent_->compress_timer());
     RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
         reinterpret_cast<uint8_t*>(&text[0]), &output_length, &output));
   }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
 
-  header.WriteVInt(output_length);
-  string head = header.String();
-  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(head.data()),
-                        head.size()));
-  RETURN_IF_ERROR(Write(output, output_length));
+  string rec = record.String();
+  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size()));
   return Status::OK();
 }
 
@@ -237,11 +268,15 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
   ++unflushed_rows_;
   row_buf_.Clear();
   if (compress_flag_ && !record_compression_) {
-    // Output row for a block compressed sequence file
-    // write the length as a vlong and then write the contents
+    // Output row for a block compressed sequence file.
+    // Value block: Write the length as a vlong and then write the contents.
     EncodeRow(row, &row_buf_);
     out_.WriteVLong(row_buf_.Size());
     out_.WriteBytes(row_buf_.Size(), row_buf_.String().data());
+    // Value-lengths block: Write the number of bytes we have just written to out_ as
+    // vlong
+    out_value_lengths_block_.WriteVLong(
+        ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size());
     return Status::OK();
   }
 
@@ -249,7 +284,7 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
 
   const uint8_t* value_bytes;
   int64_t value_length;
-  if (record_compression_) {
+  if (compress_flag_) {
     // apply compression to row_buf_
     // the length of the buffer must be prefixed to the buffer prior to compression
     //
@@ -275,16 +310,22 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
   int rec_len = value_length;
   // if the record is compressed, the length is part of the compressed text
   // if not, then we need to write the length (below) and account for it's size
-  if (!record_compression_) rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
+  if (!compress_flag_) {
+    rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
+  }
+  // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence
+  // as a key just like Hive).
+  rec_len += 4;
 
-  // Length of the record (incl. key length and value length)
+  // Length of the record (incl. key and value length)
   out_.WriteInt(rec_len);
 
-  // Write length of the key (Impala/Hive doesn't write a key)
-  out_.WriteInt(0);
+  // Write length of the key and the key
+  out_.WriteInt(4);
+  out_.WriteBytes(4, "\0\0\0\0");
 
   // if the record is compressed, the length is part of the compressed text
-  if (!record_compression_) out_.WriteVLong(value_length);
+  if (!compress_flag_) out_.WriteVLong(value_length);
 
   // write out the value (possibly compressed)
   out_.WriteBytes(value_length, value_bytes);
@@ -304,6 +345,8 @@ Status HdfsSequenceTableWriter::Flush() {
         Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
   }
   out_.Clear();
+  out_value_lengths_block_.Clear();
+  mem_pool_->FreeAll();
   unflushed_rows_ = 0;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h
index 7f6a888..3d72926 100644
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ b/be/src/exec/hdfs-sequence-table-writer.h
@@ -36,6 +36,66 @@ class RuntimeState;
 struct StringValue;
 struct OutputPartition;
 
+/// Sequence files are flat files consisting of binary key/value pairs. Essentially there
+/// are 3 different formats for sequence files depending on the 'compression_codec' and
+/// 'seq_compression_mode' query options:
+/// - Uncompressed sequence file format
+/// - Record-compressed sequence file format
+/// - Block-compressed sequence file format
+/// All of them share a common header described below.
+///
+/// Sequence File Header
+/// --------------------
+/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number
+///   (e.g. SEQ4 or SEQ6)
+/// - keyClassName - key class
+/// - valueClassName - value class
+/// - compression - A boolean which specifies if compression is turned on for keys/values
+///   in this file.
+/// - blockCompression - A boolean which specifies if block-compression is turned on for
+///   keys/values in this file.
+/// - compression codec - compression codec class which is used for compression of keys
+///   and/or values (if compression is enabled).
+/// - metadata - SequenceFile.Metadata for this file.
+/// - sync - A 16 byte sync marker to denote end of the header.
+///
+/// Uncompressed Sequence File Format
+/// ---------------------------------
+/// - Header
+/// - Record
+///   - Record length
+///   - Key length
+///   - Key
+///   - Value
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
+///
+/// Record-Compressed Sequence File Format
+/// --------------------------------------
+/// - Header
+/// - Record
+///   - Record length
+///   - Key length
+///   - Key
+///   - Compressed Value
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
+///
+/// Block-Compressed Sequence File Format
+/// -------------------------------------
+/// - Header
+/// - Record Block
+///   - Uncompressed number of records in the block
+///   - Compressed key-lengths block-size
+///   - Compressed key-lengths block
+///   - Compressed keys block-size
+///   - Compressed keys block
+///   - Compressed value-lengths block-size
+///   - Compressed value-lengths block
+///   - Compressed values block-size
+///   - Compressed values block
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block.
+/// The compressed blocks of key lengths and value lengths consist of the actual lengths
+/// of individual keys/values encoded in zero-compressed integer format.
+
 /// Consumes rows and outputs the rows into a sequence file in HDFS
 /// Output is buffered to fill sequence file blocks.
 class HdfsSequenceTableWriter : public HdfsTableWriter {
@@ -67,7 +127,8 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
   /// writes the SEQ file header to HDFS
   Status WriteFileHeader();
 
-  /// writes the contents of out_ as a single compressed block
+  /// writes the contents of out_value_lengths_block_ and out_ as a single
+  /// block-compressed record.
   Status WriteCompressedBlock();
 
   /// writes the tuple row to the given buffer; separates fields by field_delim_,
@@ -88,6 +149,10 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
   /// buffer which holds accumulated output
   WriteStream out_;
 
+  /// buffer which holds accumulated value-lengths output (used with block-compressed
+  /// sequence files)
+  WriteStream out_value_lengths_block_;
+
   /// Temporary Buffer for a single row
   WriteStream row_buf_;
 
@@ -119,10 +184,12 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
   /// A -1 infront of the sync marker, used in decompressed formats
   std::string neg1_sync_marker_;
 
+  /// Name of java class to use when reading the keys
+  static const char* KEY_CLASS_NAME;
   /// Name of java class to use when reading the values
   static const char* VALUE_CLASS_NAME;
   /// Magic characters used to identify the file type
-  static uint8_t SEQ6_CODE[4];
+  static const uint8_t SEQ6_CODE[4];
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc
index 0f9c6ae..e4448de 100644
--- a/be/src/exec/read-write-util-test.cc
+++ b/be/src/exec/read-write-util-test.cc
@@ -56,6 +56,63 @@ TEST(ReadWriteUtil, BigEndian) {
   TestBigEndian<uint64_t>(0xffffffffffffff);
 }
 
+TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) {
+  // Small longs stored in 1 byte
+  for (int64_t val = -112; val <= 127; val++) {
+    EXPECT_EQ(1, ReadWriteUtil::VLongRequiredBytes(val));
+  }
+  // Small longs stored in 2 bytes
+  for (int64_t val = -128; val < -112; val++) {
+    EXPECT_EQ(2, ReadWriteUtil::VLongRequiredBytes(val));
+  }
+  // Positive longs stored in 3-9 bytes
+  int64_t val = 0x7000ab00cd00ef00;
+  for (int sh = 0; sh <= 6; sh++) {
+    EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val));
+    val = val >> 8;
+  }
+  // Negative longs stored 3-9 bytes
+  val = 0x8000ab00cd00ef00;
+  for (int sh = 0; sh <= 6; sh++) {
+    EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val));
+    val = val >> 8;
+  }
+  //Max/min long is stored in 9 bytes
+  EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x7fffffffffffffff));
+  EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x8000000000000000));
+}
+
+void TestPutGetZeroCompressedLong(int64_t val) {
+  uint8_t buffer[9];
+  int64_t read_val;
+  int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer);
+  int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val);
+  EXPECT_EQ(read_bytes, num_bytes);
+  EXPECT_EQ(read_val, val);
+}
+
+TEST(ReadWriteUtil, ZeroCompressedLong) {
+  //1 byte longs
+  for (int64_t val = -128; val <= 127; val++) {
+    TestPutGetZeroCompressedLong(val);
+  }
+  //2+ byte positive longs
+  int64_t val = 0x70100000200000ab;
+  for (int sh = 0; sh <= 6; sh++) {
+    TestPutGetZeroCompressedLong(val);
+    val = val >> 8;
+  }
+  //2+ byte negative longs
+  val = 0x80100000200000ab;
+  for (int sh = 0; sh <= 6; sh++) {
+    TestPutGetZeroCompressedLong(val);
+    val = val >> 8;
+  }
+  //Max/min long
+  TestPutGetZeroCompressedLong(0x7fffffffffffffff);
+  TestPutGetZeroCompressedLong(0x8000000000000000);
+}
+
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index c384839..84d41dd 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -60,11 +60,11 @@ class ReadWriteUtil {
   static int GetVInt(uint8_t* buf, int32_t* vint);
 
   /// Writes a variable-length Long or int value to a byte buffer.
-  /// Returns the number of bytes written
+  /// Returns the number of bytes written.
   static int64_t PutVLong(int64_t val, uint8_t* buf);
   static int64_t PutVInt(int32_t val, uint8_t* buf);
 
-  /// returns size of the encoded long value, not including the 1 byte for length
+  /// Returns size of the encoded long value, including the 1 byte for length.
   static int VLongRequiredBytes(int64_t val);
 
   /// Read a variable-length Long value from a byte buffer starting at the specified
@@ -211,41 +211,63 @@ inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong)
   return len;
 }
 
+// Returns size of the encoded long value, including the 1 byte for length for val < -112
+// or val > 127.
 inline int ReadWriteUtil::VLongRequiredBytes(int64_t val) {
-  // returns size of the encoded long value, not including the 1 byte for length
-  if (val & 0xFF00000000000000llu) return 8;
-  if (val & 0x00FF000000000000llu) return 7;
-  if (val & 0x0000FF0000000000llu) return 6;
-  if (val & 0x000000FF00000000llu) return 5;
-  if (val & 0x00000000FF000000llu) return 4;
-  if (val & 0x0000000000FF0000llu) return 3;
-  if (val & 0x000000000000FF00llu) return 2;
-  // Values between -112 and 127 are stored using 1 byte,
-  // values between -127 and -112 are stored using 2 bytes
-  // See ReadWriteUtil::DecodeVIntSize for this case
-  if (val < -112) return 2;
-  return 1;
+  if (val >= -112 && val <= 127) return 1;
+  // If 'val' is negtive, take the one's complement.
+  if (val < 0) val = ~val;
+  return 9 - __builtin_clzll(val)/8;
 }
 
+// Serializes 'val' to a binary stream with zero-compressed encoding. For -112<=val<=127,
+// only one byte is used with the actual value. For other values of 'val', the first byte
+// value indicates whether the long is positive or negative, and the number of bytes that
+// follow. If the first byte value v is between -113 and -120, the following long is
+// positive, with number of bytes that follow are -(v+112). If the first byte value v is
+// between -121 and -128, the following long is negative, with number of bytes that follow
+// are -(v+120). Bytes are stored in the high-non-zero-byte-first order. Returns the
+// number of bytes written.
+// For more information, see the documentation for 'WritableUtils.writeVLong()' method:
+// https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/WritableUtils.html
 inline int64_t ReadWriteUtil::PutVLong(int64_t val, uint8_t* buf) {
   int64_t num_bytes = VLongRequiredBytes(val);
 
   if (num_bytes == 1) {
+    DCHECK(val >= -112 && val <= 127);
     // store the value itself instead of the length
     buf[0] = static_cast<int8_t>(val);
     return 1;
   }
 
   // This is how we encode the length for a length less than or equal to 8
-  buf[0] = -119 + num_bytes;
+  DCHECK_GE(num_bytes, 2);
+  DCHECK_LE(num_bytes, 9);
+  if (val < 0) {
+    DCHECK_LT(val, -112);
+    // The first byte in 'buf' should contain a value between -121 and -128 that makes the
+    // following condition true: -(buf[0] + 120) == num_bytes - 1.
+    // Note that 'num_bytes' includes the 1 extra byte for length.
+    buf[0] = -(num_bytes + 119);
+    // If 'val' is negtive, take the one's complement.
+    // See the source code for WritableUtils.writeVLong() method:
+    // https://hadoop.apache.org/docs/r2.7.2/api/src-html/org/apache/hadoop/io/
+    // WritableUtils.html#line.271
+    val = ~val;
+  } else {
+    DCHECK_GT(val, 127);
+    // The first byte in 'buf' should contain a value between -113 and -120 that makes the
+    // following condition true: -(buf[0] + 112) == num_bytes - 1.
+    // Note that 'num_bytes' includes the 1 extra byte for length.
+    buf[0] = -(num_bytes + 111);
+  }
 
-  // write to buffer in reversed endianness
-  for (int i = 0; i < num_bytes; ++i) {
-    buf[i+1] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
+  // write to the buffer in Big Endianness
+  for (int i = 1; i < num_bytes; ++i) {
+    buf[i] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
   }
 
-  // +1 for the length byte
-  return num_bytes + 1;
+  return num_bytes;
 }
 
 inline int64_t ReadWriteUtil::PutVInt(int32_t val, uint8_t* buf) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 429ae66..efa39bf 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -204,13 +204,27 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
     int64_t input_length, const uint8_t* input, int64_t *output_length,
     uint8_t** output) {
   DCHECK_GE(input_length, 0);
-  // Hadoop uses a block compression scheme on top of snappy.  First there is
-  // an integer which is the size of the decompressed data followed by a
-  // sequence of compressed blocks each preceded with an integer size.
-  // For testing purposes we are going to generate two blocks.
-  int64_t block_size = input_length / 2;
-  size_t length = snappy::MaxCompressedLength(block_size) * 2;
-  length += 3 * sizeof (int32_t);
+  // Hadoop uses a block compression scheme on top of snappy. The layout is as follows:
+  // - size of the entire decompressed data (4 bytes)
+  // - size of the 1st compressed block (4 bytes)
+  // - 1st compressed block
+  // - size of the 2nd compressed block (4 bytes)
+  // - 2nd compressed block
+  // ...
+  // For testing purposes we are going to generate two blocks if input_length >= 4K.
+  vector<int64_t> block_sizes;
+  size_t length;
+  if (input_length == 0) {
+    length = sizeof (int32_t);
+  } else if (input_length < 4 * 1024) {
+    block_sizes.push_back(input_length);
+    length = snappy::MaxCompressedLength(block_sizes[0]) + 2 * sizeof (int32_t);
+  } else {
+    block_sizes.push_back(input_length / 2);
+    block_sizes.push_back(input_length - block_sizes[0]);
+    length = snappy::MaxCompressedLength(block_sizes[0]) +
+        snappy::MaxCompressedLength(block_sizes[1]) + 3 * sizeof (int32_t);
+  }
   DCHECK(!output_preallocated || length <= *output_length);
 
   if (output_preallocated) {
@@ -222,13 +236,12 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
   }
 
   uint8_t* outp = out_buffer_;
-  uint8_t* sizep;
   ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
   outp += sizeof (int32_t);
-  while (input_length > 0) {
+  for (int64_t block_size: block_sizes) {
     // TODO: should this be a while or a do-while loop? Check what Hadoop does.
     // Point at the spot to store the compressed size.
-    sizep = outp;
+    uint8_t* sizep = outp;
     outp += sizeof (int32_t);
     size_t size;
     snappy::RawCompress(reinterpret_cast<const char*>(input),
@@ -236,8 +249,8 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
 
     ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
     input += block_size;
-    input_length -= block_size;
     outp += size;
+    DCHECK_LE(outp - out_buffer_, length);
   }
 
   *output = out_buffer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 7709bb1..1f84bad 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -73,8 +73,15 @@ class DecompressorTest : public ::testing::Test {
       DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
           sizeof(input_), input_);
     } else {
-      CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_),
+      CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_);
+      // Test with odd-length input (to test the calculation of block-sizes in
+      // SnappyBlockCompressor)
+      CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_) - 1,
           input_);
+      // Test with input length of 1024 (to test SnappyBlockCompressor with a single
+      // block)
+      CompressAndDecompress(compressor.get(), decompressor.get(), 1024, input_);
+      // Test with empty input
       if (format != THdfsCompression::BZIP2) {
         CompressAndDecompress(compressor.get(), decompressor.get(), 0, NULL);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
index fe4d829..528b83e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
@@ -1,8 +1,5 @@
 ====
 ---- QUERY
-drop table if exists __seq_write;
-====
----- QUERY
 SET COMPRESSION_CODEC=NONE;
 SET ALLOW_UNSUPPORTED_FORMATS=1;
 SET SEQ_COMPRESSION_MODE=BLOCK;
@@ -92,5 +89,192 @@ select * from __seq_write;
 INT,STRING,DOUBLE
 ====
 ---- QUERY
-drop table __seq_write;
+# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=NONE;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_none_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_none_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_none_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then
+# read it back
+SET COMPRESSION_CODEC=DEFAULT;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_def_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_def_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_def_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and
+# then read it back
+SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snapb_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=SNAPPY;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snap_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snap_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snap_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=GZIP;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_gzip_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it
+# back
+SET COMPRESSION_CODEC=NONE;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_none_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_none_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_none_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read
+# it back
+SET COMPRESSION_CODEC=DEFAULT;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_def_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_def_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_def_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and
+# then read it back
+SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snapb_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snapb_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snapb_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read
+# it back
+SET COMPRESSION_CODEC=SNAPPY;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snap_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snap_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snap_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it
+# back
+SET COMPRESSION_CODEC=GZIP;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_gzip_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_gzip_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_gzip_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 654ea48..36dc427 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -144,18 +144,43 @@ class TestTableWriters(ImpalaTestSuite):
         (v.get_value('table_format').file_format =='text' and
         v.get_value('table_format').compression_codec == 'none'))
 
-  def test_seq_writer(self, vector):
-    # TODO debug this test, same as seq writer.
-    # This caused by a zlib failure. Suspected cause is too small a buffer
-    # passed to zlib for compression; similar to IMPALA-424
-    pytest.skip()
-    self.run_test_case('QueryTest/seq-writer', vector)
+  def test_seq_writer(self, vector, unique_database):
+    self.run_test_case('QueryTest/seq-writer', vector, unique_database)
+
+  def test_seq_writer_hive_compatibility(self, vector, unique_database):
+    self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1')
+    # Write sequence files with different compression codec/compression mode and then read
+    # it back in Impala and Hive.
+    # Note that we don't test snappy here as the snappy codec used by Impala does not seem
+    # to be fully compatible with the snappy codec used by Hive.
+    for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'),
+                                  ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'),
+                                  ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]:
+      table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode)
+      self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec)
+      self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode)
+      self.client.execute('create table %s like functional.zipcode_incomes stored as '
+          'sequencefile' % table_name)
+      # Write sequence file of size greater than 4K
+      self.client.execute('insert into %s select * from functional.zipcode_incomes where '
+          'zip >= "5"' % table_name)
+      # Write sequence file of size less than 4K
+      self.client.execute('insert into %s select * from functional.zipcode_incomes where '
+          'zip="00601"' % table_name)
+      # Read it back in Impala
+      output = self.client.execute('select count(*) from %s' % table_name)
+      assert '16541' == output.get_data()
+      # Read it back in Hive
+      output = self.run_stmt_in_hive('select count(*) from %s' % table_name)
+      assert '16541' == output.split('\n')[1]
 
   def test_avro_writer(self, vector):
     self.run_test_case('QueryTest/avro-writer', vector)
 
   def test_text_writer(self, vector):
-    # TODO debug this test, same as seq writer.
+    # TODO debug this test.
+    # This caused by a zlib failure. Suspected cause is too small a buffer
+    # passed to zlib for compression; similar to IMPALA-424
     pytest.skip()
     self.run_test_case('QueryTest/text-writer', vector)
 


[09/10] incubator-impala git commit: IMPALA-5251: Fix propagation of input exprs' types in 2-phase agg

Posted by kw...@apache.org.
IMPALA-5251: Fix propagation of input exprs' types in 2-phase agg

Since commit d2d3f4c (on asf-master), TAggregateExpr contains
the logical input types of the Aggregate Expr. The reason they
are included is that merging aggregate expressions will have
input tyes of the intermediate values which aren't necessarily
the same as the input types. For instance, NDV() uses a binary
blob as its intermediate value and it's passed to its merge
aggregate expressions as a StringVal but the input type of NDV()
in the query could be DecimalVal. In this case, we consider
DecimalVal as the logical input type while StringVal is the
intermediate type. The logical input types are accessed by the
BE via GetConstFnAttr() during interpretation and constant
propagation during codegen.

To handle distinct aggregate expressions (e.g. select count(distinct)),
the FE uses 2-phase aggregation by introducing an extra phase of
split/merge aggregation in which the distinct aggregate expressions'
inputs are coverted and added to the group-by expressions in the first
phase while the non-distinct aggregate expressions go through the normal
split/merge treatement.

The bug is that the existing code incorrectly propagates the intermediate
types of the non-grouping aggregate expressions as the logical input types
to the merging aggregate expressions in the second phase of aggregation.
The input aggregate expressions for the non-distinct aggregate expressions
in the second phase aggregation are already merging aggregate expressions
(from phase one) in which case we should not treat its input types as
logical input types.

This change fixes the problem above by checking if the input aggregate
expression passed to FunctionCallExpr.createMergeAggCall() is already
a merging aggregate expression. If so, it will use the logical input
types recorded in its 'mergeAggInputFn_' as references for its logical
input types instead of the aggregate expression input types themselves.

Change-Id: I158303b20d1afdff23c67f3338b9c4af2ad80691
Reviewed-on: http://gerrit.cloudera.org:8080/6724
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/42ca45e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/42ca45e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/42ca45e8

Branch: refs/heads/master
Commit: 42ca45e8307ba4c831ad7ac8da86bbbd957fe4cd
Parents: e78d71e
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Apr 25 00:10:08 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 26 21:40:32 2017 +0000

----------------------------------------------------------------------
 be/src/testutil/test-udas.cc                    | 93 ++++++++++----------
 .../impala/analysis/FunctionCallExpr.java       | 13 ++-
 .../queries/PlannerTest/aggregation.test        | 40 +++++++++
 .../functional-query/queries/QueryTest/uda.test | 52 +++++++++++
 tests/query_test/test_udfs.py                   |  5 ++
 5 files changed, 152 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42ca45e8/be/src/testutil/test-udas.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/test-udas.cc b/be/src/testutil/test-udas.cc
index 549f2f0..806a971 100644
--- a/be/src/testutil/test-udas.cc
+++ b/be/src/testutil/test-udas.cc
@@ -57,36 +57,30 @@ StringVal AggFinalize(FunctionContext*, const StringVal& v) {
 
 // Defines AggIntermediate(int) returns BIGINT intermediate STRING
 void AggIntermediate(FunctionContext* context, const IntVal&, StringVal*) {}
-void AggIntermediateUpdate(FunctionContext* context, const IntVal&, StringVal*) {
+static void ValidateFunctionContext(const FunctionContext* context) {
   assert(context->GetNumArgs() == 1);
   assert(context->GetArgType(0)->type == FunctionContext::TYPE_INT);
   assert(context->GetIntermediateType().type == FunctionContext::TYPE_STRING);
   assert(context->GetReturnType().type == FunctionContext::TYPE_BIGINT);
 }
+void AggIntermediateUpdate(FunctionContext* context, const IntVal&, StringVal*) {
+  ValidateFunctionContext(context);
+}
 void AggIntermediateInit(FunctionContext* context, StringVal*) {
-  assert(context->GetNumArgs() == 1);
-  assert(context->GetArgType(0)->type == FunctionContext::TYPE_INT);
-  assert(context->GetIntermediateType().type == FunctionContext::TYPE_STRING);
-  assert(context->GetReturnType().type == FunctionContext::TYPE_BIGINT);
+  ValidateFunctionContext(context);
 }
 void AggIntermediateMerge(FunctionContext* context, const StringVal&, StringVal*) {
-  assert(context->GetNumArgs() == 1);
-  assert(context->GetArgType(0)->type == FunctionContext::TYPE_INT);
-  assert(context->GetIntermediateType().type == FunctionContext::TYPE_STRING);
-  assert(context->GetReturnType().type == FunctionContext::TYPE_BIGINT);
+  ValidateFunctionContext(context);
 }
 BigIntVal AggIntermediateFinalize(FunctionContext* context, const StringVal&) {
-  assert(context->GetNumArgs() == 1);
-  assert(context->GetArgType(0)->type == FunctionContext::TYPE_INT);
-  assert(context->GetIntermediateType().type == FunctionContext::TYPE_STRING);
-  assert(context->GetReturnType().type == FunctionContext::TYPE_BIGINT);
+  ValidateFunctionContext(context);
   return BigIntVal::null();
 }
 
 // Defines AggDecimalIntermediate(DECIMAL(1,2), INT) returns DECIMAL(5,6)
 // intermediate DECIMAL(3,4)
 // Useful to test that type parameters are plumbed through.
-void AggDecimalIntermediateUpdate(FunctionContext* context, const DecimalVal&, const IntVal&, DecimalVal*) {
+static void ValidateFunctionContext2(const FunctionContext* context) {
   assert(context->GetNumArgs() == 2);
   assert(context->GetArgType(0)->type == FunctionContext::TYPE_DECIMAL);
   assert(context->GetArgType(0)->precision == 2);
@@ -99,45 +93,50 @@ void AggDecimalIntermediateUpdate(FunctionContext* context, const DecimalVal&, c
   assert(context->GetReturnType().precision == 6);
   assert(context->GetReturnType().scale == 5);
 }
+void AggDecimalIntermediateUpdate(FunctionContext* context, const DecimalVal&,
+    const IntVal&, DecimalVal*) {
+  ValidateFunctionContext2(context);
+}
 void AggDecimalIntermediateInit(FunctionContext* context, DecimalVal*) {
-  assert(context->GetNumArgs() == 2);
-  assert(context->GetArgType(0)->type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetArgType(0)->precision == 2);
-  assert(context->GetArgType(0)->scale == 1);
-  assert(context->GetArgType(1)->type == FunctionContext::TYPE_INT);
-  assert(context->GetIntermediateType().type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetIntermediateType().precision == 4);
-  assert(context->GetIntermediateType().scale == 3);
-  assert(context->GetReturnType().type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetReturnType().precision == 6);
-  assert(context->GetReturnType().scale == 5);
+  ValidateFunctionContext2(context);
 }
-void AggDecimalIntermediateMerge(FunctionContext* context, const DecimalVal&, DecimalVal*) {
-  assert(context->GetNumArgs() == 2);
-  assert(context->GetArgType(0)->type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetArgType(0)->precision == 2);
-  assert(context->GetArgType(0)->scale == 1);
-  assert(context->GetArgType(1)->type == FunctionContext::TYPE_INT);
-  assert(context->GetIntermediateType().type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetIntermediateType().precision == 4);
-  assert(context->GetIntermediateType().scale == 3);
-  assert(context->GetReturnType().type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetReturnType().precision == 6);
-  assert(context->GetReturnType().scale == 5);
+void AggDecimalIntermediateMerge(FunctionContext* context, const DecimalVal&,
+    DecimalVal*) {
+  ValidateFunctionContext2(context);
 }
 DecimalVal AggDecimalIntermediateFinalize(FunctionContext* context, const DecimalVal&) {
-  assert(context->GetNumArgs() == 2);
+  ValidateFunctionContext2(context);
+  return DecimalVal::null();
+}
+
+// Defines AggStringIntermediate(DECIMAL(20,10), BIGINT, STRING) returns DECIMAL(20,0)
+// intermediate STRING.
+// Useful to test decimal input types with string as intermediate types.
+static void ValidateFunctionContext3(const FunctionContext* context) {
+  assert(context->GetNumArgs() == 3);
   assert(context->GetArgType(0)->type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetArgType(0)->precision == 2);
-  assert(context->GetArgType(0)->scale == 1);
-  assert(context->GetArgType(1)->type == FunctionContext::TYPE_INT);
-  assert(context->GetIntermediateType().type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetIntermediateType().precision == 4);
-  assert(context->GetIntermediateType().scale == 3);
+  assert(context->GetArgType(0)->precision == 20);
+  assert(context->GetArgType(0)->scale == 10);
+  assert(context->GetArgType(1)->type == FunctionContext::TYPE_BIGINT);
+  assert(context->GetArgType(2)->type == FunctionContext::TYPE_STRING);
+  assert(context->GetIntermediateType().type == FunctionContext::TYPE_STRING);
   assert(context->GetReturnType().type == FunctionContext::TYPE_DECIMAL);
-  assert(context->GetReturnType().precision == 6);
-  assert(context->GetReturnType().scale == 5);
-  return DecimalVal::null();
+  assert(context->GetReturnType().precision == 20);
+  assert(context->GetReturnType().scale == 0);
+}
+void AggStringIntermediateUpdate(FunctionContext* context, const DecimalVal&,
+    const BigIntVal&, const StringVal&, StringVal*) {
+  ValidateFunctionContext3(context);
+}
+void AggStringIntermediateInit(FunctionContext* context, StringVal*) {
+  ValidateFunctionContext3(context);
+}
+void AggStringIntermediateMerge(FunctionContext* context, const StringVal&, StringVal*) {
+  ValidateFunctionContext3(context);
+}
+DecimalVal AggStringIntermediateFinalize(FunctionContext* context, const StringVal&) {
+  ValidateFunctionContext3(context);
+  return DecimalVal(100);
 }
 
 // Defines MemTest(bigint) return bigint

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42ca45e8/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index c9d098d..1e06254 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -102,8 +102,12 @@ public class FunctionCallExpr extends Expr {
       FunctionCallExpr agg, List<Expr> params) {
     Preconditions.checkState(agg.isAnalyzed());
     Preconditions.checkState(agg.isAggregateFunction());
+    // If the input aggregate function is already a merge aggregate function (due to
+    // 2-phase aggregation), its input types will be the intermediate value types. The
+    // original input argument exprs are in 'agg.mergeAggInputFn_' so use it instead.
+    FunctionCallExpr mergeAggInputFn = agg.isMergeAggFn() ? agg.mergeAggInputFn_ : agg;
     FunctionCallExpr result = new FunctionCallExpr(
-        agg.fnName_, new FunctionParams(false, params), agg);
+        agg.fnName_, new FunctionParams(false, params), mergeAggInputFn);
     // Inherit the function object from 'agg'.
     result.fn_ = agg.fn_;
     result.type_ = agg.type_;
@@ -127,8 +131,8 @@ public class FunctionCallExpr extends Expr {
     fnName_ = other.fnName_;
     isAnalyticFnCall_ = other.isAnalyticFnCall_;
     isInternalFnCall_ = other.isInternalFnCall_;
-    mergeAggInputFn_ =
-        other.mergeAggInputFn_ == null ? null : (FunctionCallExpr)other.mergeAggInputFn_.clone();
+    mergeAggInputFn_ = other.mergeAggInputFn_ == null ?
+        null : (FunctionCallExpr)other.mergeAggInputFn_.clone();
     // Clone the params in a way that keeps the children_ and the params.exprs()
     // in sync. The children have already been cloned in the super c'tor.
     if (other.params_.isStar()) {
@@ -574,7 +578,8 @@ public class FunctionCallExpr extends Expr {
   void validateMergeAggFn(FunctionCallExpr inputAggFn) {
     Preconditions.checkState(isMergeAggFn());
     List<Expr> copiedInputExprs = mergeAggInputFn_.getChildren();
-    List<Expr> inputExprs = inputAggFn.getChildren();
+    List<Expr> inputExprs = inputAggFn.isMergeAggFn() ?
+        inputAggFn.mergeAggInputFn_.getChildren() : inputAggFn.getChildren();
     Preconditions.checkState(copiedInputExprs.size() == inputExprs.size());
     for (int i = 0; i < inputExprs.size(); ++i) {
       Type copiedInputType = copiedInputExprs.get(i).getType();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42ca45e8/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index a1177b0..b5c3970 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -554,6 +554,46 @@ PLAN-ROOT SINK
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
+# Mixed distinct and non-distinct agg with intermediate type different from input type
+# Regression test for IMPALA-5251 to exercise validateMergeAggFn() in FunctionCallExpr.
+select avg(l_quantity), ndv(l_discount), count(distinct l_partkey)
+from tpch_parquet.lineitem;
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount)
+|
+01:AGGREGATE
+|  output: avg(l_quantity), ndv(l_discount)
+|  group by: l_partkey
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   partitions=1/1 files=3 size=193.74MB
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount)
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(l_partkey), avg:merge(l_quantity), ndv:merge(l_discount)
+|
+04:AGGREGATE
+|  output: avg:merge(l_quantity), ndv:merge(l_discount)
+|  group by: l_partkey
+|
+03:EXCHANGE [HASH(l_partkey)]
+|
+01:AGGREGATE [STREAMING]
+|  output: avg(l_quantity), ndv(l_discount)
+|  group by: l_partkey
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   partitions=1/1 files=3 size=193.74MB
+====
 # test that aggregations are not placed below an unpartitioned exchange with a limit
 select count(*) from (select * from functional.alltypes limit 10) t
 ---- PLAN

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42ca45e8/testdata/workloads/functional-query/queries/QueryTest/uda.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/uda.test b/testdata/workloads/functional-query/queries/QueryTest/uda.test
index 3a9bbbe..932b94a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/uda.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/uda.test
@@ -88,3 +88,55 @@ from functional.decimal_tbl
 NULL,5
 ---- TYPES
 decimal,bigint
+====
+---- QUERY
+# Test that all types are exposed via the FunctionContext correctly.
+# This includes distinct aggregate expression to test IMPALA-5251.
+# It also relies on asserts in the UDA funciton.
+select
+   agg_string_intermediate(cast(c1 as decimal(20,10)), 1000, "foobar"),
+   agg_decimal_intermediate(cast(c3 as decimal(2,1)), 2),
+   agg_intermediate(int_col),
+   avg(c2),
+   min(c3-c1),
+   max(c1+c3),
+   count(distinct int_col),
+   sum(distinct int_col)
+from
+   functional.alltypesagg,
+   functional.decimal_tiny
+---- RESULTS
+100,NULL,NULL,160.49989,-10.0989,11.8989,999,499500
+---- TYPES
+decimal,decimal,bigint,decimal,decimal,decimal,bigint,bigint
+====
+---- QUERY
+# Test that all types are exposed via the FunctionContext correctly.
+# This includes distinct aggregate expression to test IMPALA-5251.
+# It also relies on asserts in the UDA funciton.
+select
+   agg_string_intermediate(cast(c1 as decimal(20,10)), 1000, "foobar"),
+   agg_decimal_intermediate(cast(c3 as decimal(2,1)), 2),
+   agg_intermediate(int_col),
+   ndv(c2),
+   sum(distinct c1)/count(distinct c1)
+from
+   functional.alltypesagg,
+   functional.decimal_tiny
+group by
+   year,month,day
+---- RESULTS
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+100,NULL,NULL,99,5.4994
+---- TYPES
+decimal,decimal,bigint,bigint,decimal
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42ca45e8/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 56ce233..ec24c9f 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -103,6 +103,11 @@ create aggregate function {database}.agg_decimal_intermediate(decimal(2,1), int)
 returns decimal(6,5) intermediate decimal(4,3) location '{location}'
 init_fn='AggDecimalIntermediateInit' update_fn='AggDecimalIntermediateUpdate'
 merge_fn='AggDecimalIntermediateMerge' finalize_fn='AggDecimalIntermediateFinalize';
+
+create aggregate function {database}.agg_string_intermediate(decimal(20,10), bigint, string)
+returns decimal(20,0) intermediate string location '{location}'
+init_fn='AggStringIntermediateInit' update_fn='AggStringIntermediateUpdate'
+merge_fn='AggStringIntermediateMerge' finalize_fn='AggStringIntermediateFinalize';
 """
 
   # Create test UDF functions in {database} from library {location}


[06/10] incubator-impala git commit: IMPALA-4893: Efficiently update the rows read counter for sequence file

Posted by kw...@apache.org.
IMPALA-4893: Efficiently update the rows read counter for sequence file

Update the rows read counter after processing the scan range instead of updating
it after reading every row for sequence files to save CPU cycles.

Change-Id: Ie42c97a36e46172884cc497aa645036c2c11f541
Reviewed-on: http://gerrit.cloudera.org:8080/6522
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5809317c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5809317c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5809317c

Branch: refs/heads/master
Commit: 5809317c9a202159ac28e0565d04151eae843d09
Parents: 59b2db6
Author: aphadke <ap...@cloudera.com>
Authored: Thu Mar 30 18:05:52 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 26 01:12:01 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-sequence-scanner.cc                         | 7 ++++---
 .../queries/QueryTest/hdfs_scanner_profile.test              | 8 ++++++++
 tests/query_test/test_scanners.py                            | 7 +++++++
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5809317c/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index d802bd7..275f96b 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -294,6 +294,7 @@ Status HdfsSequenceScanner::ProcessRange() {
   // We count the time here since there is too much overhead to do
   // this on each record.
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+  int64_t num_rows_read = 0;
 
   while (!finished()) {
     DCHECK_GT(record_locations_.size(), 0);
@@ -336,13 +337,12 @@ Status HdfsSequenceScanner::ProcessRange() {
     } else {
       add_row = WriteTemplateTuples(tuple_row_mem, 1);
     }
-
-    COUNTER_ADD(scan_node_->rows_read_counter(), 1);
+    num_rows_read++;
     if (add_row) RETURN_IF_ERROR(CommitRows(1));
     if (scan_node_->ReachedLimit()) break;
 
     // Sequence files don't end with syncs
-    if (stream_->eof()) return Status::OK();
+    if (stream_->eof())  break;
 
     // Check for sync by looking for the marker that precedes syncs.
     int marker;
@@ -353,6 +353,7 @@ Status HdfsSequenceScanner::ProcessRange() {
     }
   }
 
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5809317c/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
new file mode 100644
index 0000000..ea459e4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test
@@ -0,0 +1,8 @@
+====
+---- QUERY
+# This query will do a full table scan to count the num of rows
+# read during a scan
+select * from alltypesagg
+---- RUNTIME_PROFILE
+row_regex: .*RowsRead: 11.00K .
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5809317c/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 8ba2e0b..b0e2e80 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -68,6 +68,13 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
     new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
     self.run_test_case('QueryTest/scanners', new_vector)
 
+  def test_hdfs_scanner_profile(self, vector):
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['num_nodes'] = 1
+    if new_vector.get_value('table_format').file_format in ('kudu', 'hbase'):
+      pytest.skip()
+    self.run_test_case('QueryTest/hdfs_scanner_profile', new_vector)
+
 # Test all the scanners with a simple limit clause. The limit clause triggers
 # cancellation in the scanner code paths.
 class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):


[02/10] incubator-impala git commit: IMPALA-5031: remove undefined behavior: nullptr member function call

Posted by kw...@apache.org.
IMPALA-5031: remove undefined behavior: nullptr member function call

This member function call on a nullptr to a FragmentInstanceState was
present in data loading, end-to-end tests, and custom cluster tests.

Change-Id: I2377c14f7ea8f93bb96504dd2319c11ff709cd26
Reviewed-on: http://gerrit.cloudera.org:8080/6714
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c1463ff6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c1463ff6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c1463ff6

Branch: refs/heads/master
Commit: c1463ff6b1db163ead319636c419ee3b3a0d3e49
Parents: 72cf359
Author: Jim Apple <jb...@apache.org>
Authored: Fri Apr 21 15:18:59 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sun Apr 23 03:26:07 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1463ff6/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3fcdb54..a8284c2 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -376,7 +376,7 @@ Coordinator::~Coordinator() {
 }
 
 PlanFragmentExecutor* Coordinator::executor() {
-  return coord_instance_->executor();
+  return (coord_instance_ == nullptr) ? nullptr : coord_instance_->executor();
 }
 
 TExecNodePhase::type GetExecNodePhase(const string& key) {


[04/10] incubator-impala git commit: IMPALA-5244 test_hdfs_file_open_fail fails on local filesystem build

Posted by kw...@apache.org.
IMPALA-5244 test_hdfs_file_open_fail fails on local filesystem build

This test had to be skipped for non HDFS filesystems.

Change-Id: I5318a5eb27b15fed5df770b9c3ea23e7e1a97a4c
Reviewed-on: http://gerrit.cloudera.org:8080/6723
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/edcc593e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/edcc593e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/edcc593e

Branch: refs/heads/master
Commit: edcc593ee541bb2677d58e31fe713f2cb4951937
Parents: 915a163
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Mon Apr 24 23:22:37 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 25 10:50:18 2017 +0000

----------------------------------------------------------------------
 tests/common/skip.py                  | 1 +
 tests/data_errors/test_data_errors.py | 5 +++--
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/edcc593e/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 232eab5..e330712 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -61,6 +61,7 @@ class SkipIf:
   kudu_not_supported = pytest.mark.skipif(os.environ["KUDU_IS_SUPPORTED"] == "false",
       reason="Kudu is not supported")
   not_s3 = pytest.mark.skipif(not IS_S3, reason="S3 Filesystem needed")
+  not_hdfs = pytest.mark.skipif(not IS_HDFS, reason="HDFS Filesystem needed")
   no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
       reason="Secondary filesystem needed")
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/edcc593e/tests/data_errors/test_data_errors.py
----------------------------------------------------------------------
diff --git a/tests/data_errors/test_data_errors.py b/tests/data_errors/test_data_errors.py
index 92ae707..60d70b7 100644
--- a/tests/data_errors/test_data_errors.py
+++ b/tests/data_errors/test_data_errors.py
@@ -24,7 +24,7 @@ import random
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfS3, SkipIfLocal
+from tests.common.skip import SkipIf, SkipIfS3, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 
 class TestDataErrors(ImpalaTestSuite):
@@ -42,7 +42,8 @@ class TestDataErrors(ImpalaTestSuite):
   def get_workload(self):
     return 'functional-query'
 
-# Regression test for IMP-633. Added as a part of IMPALA-5198
+# Regression test for IMP-633. Added as a part of IMPALA-5198.
+@SkipIf.not_hdfs
 class TestHdfsFileOpenFailErrors(ImpalaTestSuite):
   @pytest.mark.execute_serially
   def test_hdfs_file_open_fail(self):


[08/10] incubator-impala git commit: IMPALA-5192: Don't bake MemPool* into IR

Posted by kw...@apache.org.
IMPALA-5192: Don't bake MemPool* into IR

Tuple::CodegenMaterializeExprs() currently bakes the MemPool*
provided by its caller into the generated IR. The MemPool*
usually belongs to some exec nodes which owns the codegend
function and it's used for allocating string buffer. With
multi-threading, IR needs to be shared across multiple fragment
instances so IR can no longer contain pointers not shared
across fragment instances.

This change fixes the problem above by using the MemPool*
argument passed to the IR function. This also cleans up
UnionNode by removing the field tuple_pool_ from it and
the logic for transferring buffer from tuple_pool_ to the
MemPool of the row batch.

Change-Id: I09d620e48032351ab9805825a4afb6536bed2302
Reviewed-on: http://gerrit.cloudera.org:8080/6657
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e78d71e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e78d71e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e78d71e6

Branch: refs/heads/master
Commit: e78d71e63328397ffdb59066982c0c6e83feb3d9
Parents: e2c53a8
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Apr 19 19:12:01 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 26 20:46:02 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/codegen-anyval.cc | 36 ++++++++------
 be/src/codegen/codegen-anyval.h  | 23 +++++----
 be/src/codegen/llvm-codegen.cc   | 21 ++++----
 be/src/codegen/llvm-codegen.h    |  7 +--
 be/src/exec/topn-node-ir.cc      |  6 +--
 be/src/exec/topn-node.cc         |  4 +-
 be/src/exec/union-node-ir.cc     |  3 +-
 be/src/exec/union-node.cc        |  9 +---
 be/src/exec/union-node.h         |  7 ---
 be/src/runtime/tuple.cc          | 92 +++++++++++++++--------------------
 be/src/runtime/tuple.h           | 20 ++++----
 11 files changed, 107 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/codegen/codegen-anyval.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-anyval.cc b/be/src/codegen/codegen-anyval.cc
index a778812..8000dfd 100644
--- a/be/src/codegen/codegen-anyval.cc
+++ b/be/src/codegen/codegen-anyval.cc
@@ -511,7 +511,7 @@ void CodegenAnyVal::SetFromRawValue(Value* raw_val) {
   }
 }
 
-Value* CodegenAnyVal::ToNativeValue(MemPool* pool) {
+Value* CodegenAnyVal::ToNativeValue(Value* pool_val) {
   Type* raw_type = codegen_->GetType(type_);
   Value* raw_val = Constant::getNullValue(raw_type);
   switch (type_.type) {
@@ -520,12 +520,13 @@ Value* CodegenAnyVal::ToNativeValue(MemPool* pool) {
       // Convert StringVal to StringValue
       Value* len = GetLen();
       raw_val = builder_->CreateInsertValue(raw_val, len, 1);
-      if (pool == NULL) {
+      if (pool_val == nullptr) {
         // Set raw_val.ptr from this->ptr
         raw_val = builder_->CreateInsertValue(raw_val, GetPtr(), 0);
       } else {
-        // Allocate raw_val.ptr from 'pool' and copy this->ptr
-        Value* new_ptr = codegen_->CodegenAllocate(builder_, pool, len, "new_ptr");
+        // Allocate raw_val.ptr from 'pool_val' and copy this->ptr
+        Value* new_ptr =
+            codegen_->CodegenMemPoolAllocate(builder_, pool_val, len, "new_ptr");
         codegen_->CodegenMemcpy(builder_, new_ptr, GetPtr(), len);
         raw_val = builder_->CreateInsertValue(raw_val, new_ptr, 0);
       }
@@ -560,9 +561,9 @@ Value* CodegenAnyVal::ToNativeValue(MemPool* pool) {
   return raw_val;
 }
 
-Value* CodegenAnyVal::ToNativePtr(Value* native_ptr, MemPool* pool) {
-  Value* v = ToNativeValue(pool);
-  if (native_ptr == NULL) {
+Value* CodegenAnyVal::ToNativePtr(Value* native_ptr, Value* pool_val) {
+  Value* v = ToNativeValue(pool_val);
+  if (native_ptr == nullptr) {
     native_ptr = codegen_->CreateEntryBlockAlloca(*builder_, v->getType());
   }
   builder_->CreateStore(v, native_ptr);
@@ -589,15 +590,17 @@ Value* CodegenAnyVal::ToNativePtr(Value* native_ptr, MemPool* pool) {
 //
 // end_write:                                        ; preds = %null, %non_null
 //   ; [insert point ends here]
-void CodegenAnyVal::WriteToSlot(const SlotDescriptor& slot_desc, Value* tuple,
-    MemPool* pool, BasicBlock* insert_before) {
-  DCHECK(tuple->getType()->isPointerTy());
-  DCHECK(tuple->getType()->getPointerElementType()->isStructTy());
+void CodegenAnyVal::WriteToSlot(const SlotDescriptor& slot_desc, Value* tuple_val,
+    Value* pool_val, BasicBlock* insert_before) {
+  DCHECK(tuple_val->getType()->isPointerTy());
+  DCHECK(tuple_val->getType()->getPointerElementType()->isStructTy());
   LLVMContext& context = codegen_->context();
   Function* fn = builder_->GetInsertBlock()->getParent();
 
   // Create new block that will come after conditional blocks if necessary
-  if (insert_before == NULL) insert_before = BasicBlock::Create(context, "end_write", fn);
+  if (insert_before == nullptr) {
+    insert_before = BasicBlock::Create(context, "end_write", fn);
+  }
 
   // Create new basic blocks and br instruction
   BasicBlock* non_null_block = BasicBlock::Create(context, "non_null", fn, insert_before);
@@ -606,14 +609,15 @@ void CodegenAnyVal::WriteToSlot(const SlotDescriptor& slot_desc, Value* tuple,
 
   // Non-null block: write slot
   builder_->SetInsertPoint(non_null_block);
-  Value* slot = builder_->CreateStructGEP(NULL, tuple, slot_desc.llvm_field_idx(),
-      "slot");
-  ToNativePtr(slot, pool);
+  Value* slot =
+      builder_->CreateStructGEP(nullptr, tuple_val, slot_desc.llvm_field_idx(), "slot");
+  ToNativePtr(slot, pool_val);
   builder_->CreateBr(insert_before);
 
   // Null block: set null bit
   builder_->SetInsertPoint(null_block);
-  slot_desc.CodegenSetNullIndicator(codegen_, builder_, tuple, codegen_->true_value());
+  slot_desc.CodegenSetNullIndicator(
+      codegen_, builder_, tuple_val, codegen_->true_value());
   builder_->CreateBr(insert_before);
 
   // Leave builder_ after conditional blocks

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/codegen/codegen-anyval.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-anyval.h b/be/src/codegen/codegen-anyval.h
index 13494ac..696f542 100644
--- a/be/src/codegen/codegen-anyval.h
+++ b/be/src/codegen/codegen-anyval.h
@@ -82,7 +82,7 @@ class CodegenAnyVal {
   /// 'name' optionally specifies the name of the returned value.
   static llvm::Value* CreateCall(LlvmCodeGen* cg, LlvmBuilder* builder,
       llvm::Function* fn, llvm::ArrayRef<llvm::Value*> args, const char* name = "",
-      llvm::Value* result_ptr = NULL);
+      llvm::Value* result_ptr = nullptr);
 
   /// Same as above but wraps the result in a CodegenAnyVal.
   static CodegenAnyVal CreateCallWrapped(LlvmCodeGen* cg, LlvmBuilder* builder,
@@ -135,7 +135,7 @@ class CodegenAnyVal {
   //
   /// If 'name' is specified, it will be used when generated instructions that set value_.
   CodegenAnyVal(LlvmCodeGen* codegen, LlvmBuilder* builder, const ColumnType& type,
-      llvm::Value* value = NULL, const char* name = "");
+      llvm::Value* value = nullptr, const char* name = "");
 
   /// Returns the current type-lowered value.
   llvm::Value* GetLoweredValue() const { return value_; }
@@ -197,16 +197,19 @@ class CodegenAnyVal {
   /// Converts this *Val's value to a native type, StringValue, TimestampValue, etc.
   /// This should only be used if this *Val is not null.
   ///
-  /// If 'pool' is non-NULL, var-len data will be copied into 'pool'.
-  llvm::Value* ToNativeValue(MemPool* pool = NULL);
+  /// If 'pool_val' is non-NULL, var-len data will be copied into 'pool_val'.
+  /// 'pool_val' has to be of type MemPool*.
+  llvm::Value* ToNativeValue(llvm::Value* pool_val = nullptr);
 
   /// Sets 'native_ptr' to this *Val's value. If non-NULL, 'native_ptr' should be a
   /// pointer to a native type, StringValue, TimestampValue, etc. If NULL, a pointer is
   /// alloca'd. In either case the pointer is returned. This should only be used if this
   /// *Val is not null.
   ///
-  /// If 'pool' is non-NULL, var-len data will be copied into 'pool'.
-  llvm::Value* ToNativePtr(llvm::Value* native_ptr = NULL, MemPool* pool = NULL);
+  /// If 'pool_val' is non-NULL, var-len data will be copied into 'pool_val'.
+  /// 'pool_val' has to be of type MemPool*.
+  llvm::Value* ToNativePtr(
+      llvm::Value* native_ptr = nullptr, llvm::Value* pool_val = nullptr);
 
   /// Writes this *Val's value to the appropriate slot in 'tuple' if non-null, or sets the
   /// appropriate null bit if null. This assumes null bits are initialized to 0. Analogous
@@ -218,9 +221,10 @@ class CodegenAnyVal {
   /// 'insert_before' if specified, or a new basic block created at the end of the
   /// function if 'insert_before' is NULL.
   ///
-  /// If 'pool' is non-NULL, var-len data will be copied into 'pool'.
+  /// If 'pool_val' is non-NULL, var-len data will be copied into 'pool_val'.
+  /// 'pool_val' has to be of type MemPool*.
   void WriteToSlot(const SlotDescriptor& slot_desc, llvm::Value* tuple,
-      MemPool* pool = NULL, llvm::BasicBlock* insert_before = NULL);
+      llvm::Value* pool_val, llvm::BasicBlock* insert_before = nullptr);
 
   /// Returns the i1 result of this == other. this and other must be non-null.
   llvm::Value* Eq(CodegenAnyVal* other);
@@ -252,7 +256,8 @@ class CodegenAnyVal {
 
   /// Ctor for created an uninitialized CodegenAnYVal that can be assigned to later.
   CodegenAnyVal()
-    : type_(INVALID_TYPE), value_(NULL), name_(NULL), codegen_(NULL), builder_(NULL) {}
+    : type_(INVALID_TYPE), value_(nullptr), name_(nullptr),
+      codegen_(nullptr), builder_(nullptr) {}
 
  private:
   ColumnType type_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 4ad5c93..cdc0c53 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -1381,20 +1381,19 @@ void LlvmCodeGen::CodegenClearNullBits(LlvmBuilder* builder, Value* tuple_ptr,
   CodegenMemset(builder, null_bytes_ptr, 0, tuple_desc.num_null_bytes());
 }
 
-Value* LlvmCodeGen::CodegenAllocate(LlvmBuilder* builder, MemPool* pool, Value* size,
-    const char* name) {
-  DCHECK(pool != NULL);
-  DCHECK(size->getType()->isIntegerTy());
-  DCHECK_LE(size->getType()->getIntegerBitWidth(), 64);
-  // Extend 'size' to i64 if necessary
-  if (size->getType()->getIntegerBitWidth() < 64) {
-    size = builder->CreateSExt(size, bigint_type());
+Value* LlvmCodeGen::CodegenMemPoolAllocate(LlvmBuilder* builder, Value* pool_val,
+    Value* size_val, const char* name) {
+  DCHECK(pool_val != nullptr);
+  DCHECK(size_val->getType()->isIntegerTy());
+  DCHECK_LE(size_val->getType()->getIntegerBitWidth(), 64);
+  DCHECK_EQ(pool_val->getType(), GetPtrType(MemPool::LLVM_CLASS_NAME));
+  // Extend 'size_val' to i64 if necessary
+  if (size_val->getType()->getIntegerBitWidth() < 64) {
+    size_val = builder->CreateSExt(size_val, bigint_type());
   }
   Function* allocate_fn = GetFunction(IRFunction::MEMPOOL_ALLOCATE, false);
-  PointerType* pool_type = GetPtrType(MemPool::LLVM_CLASS_NAME);
-  Value* pool_val = CastPtrToLlvmPtr(pool_type, pool);
   Value* alignment = GetIntConstant(TYPE_INT, MemPool::DEFAULT_ALIGNMENT);
-  Value* fn_args[] = {pool_val, size, alignment};
+  Value* fn_args[] = {pool_val, size_val, alignment};
   return builder->CreateCall(allocate_fn, fn_args, name);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 7259081..5c26c14 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -495,9 +495,10 @@ class LlvmCodeGen {
   void CodegenClearNullBits(LlvmBuilder* builder, llvm::Value* tuple_ptr,
       const TupleDescriptor& tuple_desc);
 
-  /// Codegen to call pool->Allocate(size).
-  llvm::Value* CodegenAllocate(LlvmBuilder* builder, MemPool* pool, llvm::Value* size,
-      const char* name = "");
+  /// Codegen to call pool_val->Allocate(size_val).
+  /// 'pool_val' has to be of type MemPool*.
+  llvm::Value* CodegenMemPoolAllocate(LlvmBuilder* builder, llvm::Value* pool_val,
+      llvm::Value* size_val, const char* name = "");
 
   /// Codegens IR to load array[idx] and returns the loaded value. 'array' should be a
   /// C-style array (e.g. i32*) or an IR array (e.g. [10 x i32]). This function does not

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/exec/topn-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node-ir.cc b/be/src/exec/topn-node-ir.cc
index 87f671f..f0eaeaa 100644
--- a/be/src/exec/topn-node-ir.cc
+++ b/be/src/exec/topn-node-ir.cc
@@ -27,7 +27,7 @@ void TopNNode::InsertBatch(RowBatch* batch) {
 
 // Insert if either not at the limit or it's a new TopN tuple_row
 void TopNNode::InsertTupleRow(TupleRow* input_row) {
-  Tuple* insert_tuple = NULL;
+  Tuple* insert_tuple = nullptr;
 
   if (priority_queue_->size() < limit_ + offset_) {
     insert_tuple = reinterpret_cast<Tuple*>(
@@ -38,7 +38,7 @@ void TopNNode::InsertTupleRow(TupleRow* input_row) {
     DCHECK(!priority_queue_->empty());
     Tuple* top_tuple = priority_queue_->top();
     tmp_tuple_->MaterializeExprs<false, true>(input_row, *materialized_tuple_desc_,
-        sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), NULL);
+        sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), nullptr);
     if (tuple_row_less_than_->Less(tmp_tuple_, top_tuple)) {
       // TODO: DeepCopy() will allocate new buffers for the string data. This needs
       // to be fixed to use a freelist
@@ -48,5 +48,5 @@ void TopNNode::InsertTupleRow(TupleRow* input_row) {
     }
   }
 
-  if (insert_tuple != NULL) priority_queue_->push(insert_tuple);
+  if (insert_tuple != nullptr) priority_queue_->push(insert_tuple);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index b79191f..54bfe8f 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -105,12 +105,12 @@ void TopNNode::Codegen(RuntimeState* state) {
 
     codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
         *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
-        tuple_pool_.get(), &materialize_exprs_tuple_pool_fn);
+        true, &materialize_exprs_tuple_pool_fn);
 
     if (codegen_status.ok()) {
       codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
           *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
-          NULL, &materialize_exprs_no_pool_fn);
+          false, &materialize_exprs_no_pool_fn);
 
       if (codegen_status.ok()) {
         int replaced = codegen->ReplaceCallSites(insert_batch_fn,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/exec/union-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node-ir.cc b/be/src/exec/union-node-ir.cc
index 38dfc97..3d19f07 100644
--- a/be/src/exec/union-node-ir.cc
+++ b/be/src/exec/union-node-ir.cc
@@ -25,7 +25,8 @@ void IR_ALWAYS_INLINE UnionNode::MaterializeExprs(const std::vector<ExprContext*
   DCHECK(!dst_batch->AtCapacity());
   Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
   TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
-  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, exprs, tuple_pool_.get());
+  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, exprs,
+      dst_batch->tuple_data_pool());
   dst_row->SetTuple(0, dst_tuple);
   dst_batch->CommitLastRow();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 0da1760..cd16bea 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -37,7 +37,6 @@ UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
       tuple_id_(tnode.union_node.tuple_id),
       tuple_desc_(nullptr),
       first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx),
-      tuple_pool_(nullptr),
       child_idx_(0),
       child_batch_(nullptr),
       child_row_idx_(0),
@@ -71,7 +70,6 @@ Status UnionNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
   DCHECK(tuple_desc_ != nullptr);
-  tuple_pool_.reset(new MemPool(mem_tracker()));
   codegend_union_materialize_batch_fns_.resize(child_expr_lists_.size());
 
   // Prepare const expr lists.
@@ -105,7 +103,7 @@ void UnionNode::Codegen(RuntimeState* state) {
 
     llvm::Function* tuple_materialize_exprs_fn;
     codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, *tuple_desc_,
-        child_expr_lists_[i], tuple_pool_.get(), &tuple_materialize_exprs_fn);
+        child_expr_lists_[i], true, &tuple_materialize_exprs_fn);
     if (!codegen_status.ok()) {
       // Codegen may fail in some corner cases (e.g. we don't handle TYPE_CHAR). If this
       // happens, abort codegen for this and the remaining children.
@@ -272,8 +270,6 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
-  // The tuple pool should be empty between GetNext() calls.
-  DCHECK_EQ(tuple_pool_.get()->GetTotalChunkSizes(), 0);
 
   if (to_close_child_idx_ != -1) {
     // The previous child needs to be closed if passthrough was enabled for it. In the non
@@ -309,8 +305,6 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   *eos = ReachedLimit() ||
       (!HasMorePassthrough() && !HasMoreMaterialized() && !HasMoreConst(state));
 
-  // Attach the memory in the tuple pool (if any) to the row batch.
-  row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   return Status::OK();
 }
@@ -336,6 +330,5 @@ void UnionNode::Close(RuntimeState* state) {
   for (const vector<ExprContext*>& exprs : child_expr_lists_) {
     Expr::Close(exprs, state);
   }
-  if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index b1715f9..311ae17 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -65,13 +65,6 @@ class UnionNode : public ExecNode {
   /// materialized.
   const int first_materialized_child_idx_;
 
-  /// Used by MaterializeExprs() to materialize var-len slots. The ownership of the memory
-  /// in this pool should be transferred to the row batch at the end of each GetNext()
-  /// call. The memory can't be attached to the row batch in MaterializeExprs() because
-  /// the pointer to the mem pool is hard coded in the codegen'ed MaterializeExprs().
-  /// TODO (IMPALA-5192): Remove this when no longer necessary in the future.
-  boost::scoped_ptr<MemPool> tuple_pool_;
-
   /// Const exprs materialized by this node. These exprs don't refer to any children.
   /// Only materialized by the first fragment instance to avoid duplication.
   std::vector<std::vector<ExprContext*>> const_expr_lists_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index dc83922..67a238d 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -236,7 +236,7 @@ void Tuple::MaterializeExprs(
 // writes. If 'pool' is non-NULL, string data is copied into it. Note that the generated
 // function ignores its 'pool' arg; instead we hardcode the pointer in the IR.
 //
-// Example IR for materializing an int column and a string column with non-NULL 'pool':
+// Example IR for materializing a string column with non-NULL 'pool':
 //
 // ; Function Attrs: alwaysinline
 // define void @MaterializeExprs(%"class.impala::Tuple"* %opaque_tuple,
@@ -244,72 +244,59 @@ void Tuple::MaterializeExprs(
 //     %"class.impala::ExprContext"** %materialize_expr_ctxs,
 //     %"class.impala::MemPool"* %pool,
 //     %"struct.impala::StringValue"** %non_null_string_values,
-//     i32* %total_string_lengths) #20 {
+//     i32* %total_string_lengths, i32* %num_non_null_string_values) #34 {
 // entry:
 //   %tuple = bitcast %"class.impala::Tuple"* %opaque_tuple to
-//       { i8, i32, %"struct.impala::StringValue" }*
-//   %0 = bitcast { i8, i32, %"struct.impala::StringValue" }* %tuple to i8*
-//   call void @llvm.memset.p0i8.i64(i8* %0, i8 0, i64 1, i32 0, i1 false)
-//   %1 = getelementptr %"class.impala::ExprContext"** %materialize_expr_ctxs, i32 0
-//   %expr_ctx = load %"class.impala::ExprContext"** %1
-//   %src = call i64 @GetSlotRef4(%"class.impala::ExprContext"* %expr_ctx,
-//       %"class.impala::TupleRow"* %row)
+//       <{ %"struct.impala::StringValue", i8 }>*
+//   %int8_ptr = bitcast <{ %"struct.impala::StringValue", i8 }>* %tuple to i8*
+//   %null_bytes_ptr = getelementptr inbounds i8, i8* %int8_ptr, i32 16
+//   call void @llvm.memset.p0i8.i64(i8* %null_bytes_ptr, i8 0, i64 1, i32 0, i1 false)
+//   %0 = getelementptr %"class.impala::ExprContext"*,
+//       %"class.impala::ExprContext"** %materialize_expr_ctxs, i32 0
+//   %expr_ctx = load %"class.impala::ExprContext"*, %"class.impala::ExprContext"** %0
+//   %src = call { i64, i8* } @"impala::StringFunctions::UpperWrapper"(
+//        %"class.impala::ExprContext"* %expr_ctx, %"class.impala::TupleRow"* %row)
+//   %1 = extractvalue { i64, i8* } %src, 0
 //   ; ----- generated by CodegenAnyVal::WriteToSlot() ----------------------------------
-//   %is_null = trunc i64 %src to i1
+//   %is_null = trunc i64 %1 to i1
 //   br i1 %is_null, label %null, label %non_null
 //
 // non_null:                                         ; preds = %entry
-//   %slot = getelementptr inbounds { i8, i32, %"struct.impala::StringValue" }* %tuple,
-//       i32 0, i32 1
-//   %2 = ashr i64 %src, 32
-//   %3 = trunc i64 %2 to i32
-//   store i32 %3, i32* %slot
+//   %slot = getelementptr inbounds <{ %"struct.impala::StringValue", i8 }>,
+//       <{ %"struct.impala::StringValue", i8 }>* %tuple, i32 0, i32 0
+//   %2 = extractvalue { i64, i8* } %src, 0
+//   %3 = ashr i64 %2, 32
+//   %4 = trunc i64 %3 to i32
+//   %5 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %4, 1
+//   %6 = sext i32 %4 to i64
+//   %new_ptr = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhli(
+//       %"class.impala::MemPool"* %pool, i64 %6, i32 8)
+//   %src1 = extractvalue { i64, i8* } %src, 1
+//   call void @llvm.memcpy.p0i8.p0i8.i32(
+//       i8* %new_ptr, i8* %src1, i32 %4, i32 0, i1 false)
+//   %7 = insertvalue %"struct.impala::StringValue" %5, i8* %new_ptr, 0
+//   store %"struct.impala::StringValue" %7, %"struct.impala::StringValue"* %slot
 //   br label %end_write
 //
 // null:                                             ; preds = %entry
-//   call void @SetNull6({ i8, i32, %"struct.impala::StringValue" }* %tuple)
+//   %8 = bitcast <{ %"struct.impala::StringValue", i8 }>* %tuple to i8*
+//   %null_byte_ptr = getelementptr inbounds i8, i8* %8, i32 16
+//   %null_byte = load i8, i8* %null_byte_ptr
+//   %null_bit_set = or i8 %null_byte, 1
+//   store i8 %null_bit_set, i8* %null_byte_ptr
 //   br label %end_write
 //
 // end_write:                                        ; preds = %null, %non_null
 //   ; ----- end CodegenAnyVal::WriteToSlot() -------------------------------------------
-//   %4 = getelementptr %"class.impala::ExprContext"** %materialize_expr_ctxs, i32 1
-//   %expr_ctx1 = load %"class.impala::ExprContext"** %4
-//   %src2 = call { i64, i8* } @GetSlotRef5(%"class.impala::ExprContext"* %expr_ctx1,
-//       %"class.impala::TupleRow"* %row)
-//   ; ----- generated by CodegenAnyVal::WriteToSlot() ----------------------------------
-//   %5 = extractvalue { i64, i8* } %src2, 0
-//   %is_null5 = trunc i64 %5 to i1
-//   br i1 %is_null5, label %null4, label %non_null3
-//
-// non_null3:                                        ; preds = %end_write
-//   %slot7 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue" }* %tuple,
-//       i32 0, i32 2
-//   %6 = extractvalue { i64, i8* } %src2, 0
-//   %7 = ashr i64 %6, 32
-//   %8 = trunc i64 %7 to i32
-//   %9 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %8, 1
-//   %new_ptr = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhi(
-//       %"class.impala::MemPool"* inttoptr (i64 159661008 to %"class.impala::MemPool"*),
-//       i32 %8)
-//   %src8 = extractvalue { i64, i8* } %src2, 1
-//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %new_ptr, i8* %src8, i32 %8, i32 0,
-//       i1 false)
-//   %10 = insertvalue %"struct.impala::StringValue" %9, i8* %new_ptr, 0
-//   store %"struct.impala::StringValue" %10, %"struct.impala::StringValue"* %slot7
-//   br label %end_write6
-//
-// null4:                                            ; preds = %end_write
-//   call void @SetNull7({ i8, i32, %"struct.impala::StringValue" }* %tuple)
-//   br label %end_write6
-//
-// end_write6:                                       ; preds = %null4, %non_null3
-//   ; ----- end CodegenAnyVal::WriteToSlot() -------------------------------------------
 //   ret void
 // }
 Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_vals,
     const TupleDescriptor& desc, const vector<ExprContext*>& materialize_expr_ctxs,
-    MemPool* pool, Function** fn) {
-  DCHECK(!collect_string_vals) << "CodegenMaterializeExprs: collect_string_vals NYI";
+    bool use_mem_pool, Function** fn) {
+  // Only support 'collect_string_vals' == false for now.
+  if (collect_string_vals) {
+    return Status("CodegenMaterializeExprs() collect_string_vals == true NYI");
+  }
   SCOPED_TIMER(codegen->codegen_timer());
   LLVMContext& context = codegen->context();
 
@@ -356,7 +343,8 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_
   Value* row_arg = args[1];
   // Value* desc_arg = args[2]; // unused
   Value* expr_ctxs_arg = args[3];
-  // Value* pool_arg = args[4]; // unused
+  Value* pool_arg = args[4];
+  // The followings arguments are unused as 'collect_string_vals' is false.
   // Value* non_null_string_values_arg = args[5]; // unused
   // Value* total_string_lengths_arg = args[6]; // unused
   // Value* num_non_null_string_values_arg = args[7]; // unused
@@ -386,7 +374,7 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_
         materialize_expr_fns[i], expr_args, "src");
 
     // Write expr result 'src' to slot
-    src.WriteToSlot(*slot_desc, tuple, pool);
+    src.WriteToSlot(*slot_desc, tuple, use_mem_pool ? pool_arg : nullptr);
   }
   builder.CreateRetVoid();
   // TODO: if pool != NULL, OptimizeFunctionWithExprs() is inlining the Allocate()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e78d71e6/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 82efc24..cf278ae 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -118,16 +118,17 @@ class Tuple {
   ///
   /// If non-NULL, 'pool' is used to allocate var-length data, otherwise var-length data
   /// isn't copied. (Memory for this tuple itself must already be allocated.) 'NULL_POOL'
-  /// should be true if 'pool' is NULL and false otherwise. The template parameter serves
+  /// must be true if 'pool' is NULL and false otherwise. The template parameter serves
   /// only to differentiate the NULL vs. non-NULL pool cases when we replace the function
   /// calls during codegen; the parameter means there are two different function symbols.
+  /// Callers of CodegenMaterializeExprs must set 'use_mem_pool' to true to generate the
+  /// IR function for the case 'pool' is non-NULL and false for the NULL pool case.
   ///
   /// If 'COLLECT_STRING_VALS' is true, the materialized non-NULL string value slots and
   /// the total length of the string slots are returned in 'non_null_string_values' and
   /// 'total_string_lengths'. 'non_null_string_values' and 'total_string_lengths' must be
   /// non-NULL in this case. 'non_null_string_values' does not need to be empty; its
   /// original contents will be overwritten.
-
   /// TODO: this function does not collect other var-len types such as collections.
   template <bool COLLECT_STRING_VALS, bool NULL_POOL>
   inline void IR_ALWAYS_INLINE MaterializeExprs(TupleRow* row,
@@ -160,16 +161,17 @@ class Tuple {
   static const char* MATERIALIZE_EXPRS_NULL_POOL_SYMBOL;
 
   /// Generates an IR version of MaterializeExprs(), returned in 'fn'. Currently only
-  /// 'collect_string_vals' = false is implemented.
+  /// 'collect_string_vals' = false is implemented and some arguments passed to the IR
+  /// function are unused.
   ///
-  /// 'pool' may be NULL, in which case no pool-related code is generated. Otherwise
-  /// 'pool's address is used directly in the IR. Note that this requires generating
-  /// separate functions for the non-NULL and NULL cases, i.e., the 'pool' argument of the
-  /// generated function is ignored. There are two different MaterializeExprs symbols to
-  /// differentiate these cases when we replace the function calls during codegen.
+  /// If 'use_mem_pool' is true, any varlen data will be copied into the MemPool specified
+  /// in the 'pool' argument of the generated function. Otherwise, the varlen data won't
+  /// be copied. There are two different MaterializeExprs symbols to differentiate between
+  /// these cases when we replace the function calls during codegen. Please see comment
+  /// of MaterializeExprs() for details.
   static Status CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_vals,
       const TupleDescriptor& desc, const vector<ExprContext*>& materialize_expr_ctxs,
-      MemPool* pool, llvm::Function** fn);
+      bool use_mem_pool, llvm::Function** fn);
 
   /// Turn null indicator bit on. For non-nullable slots, the mask will be 0 and
   /// this is a no-op (but we don't have to branch to check is slots are nullable).


[03/10] incubator-impala git commit: IMPALA-5125: SimplifyConditionalsRule incorrectly handles aggregates

Posted by kw...@apache.org.
IMPALA-5125: SimplifyConditionalsRule incorrectly handles aggregates

This patch addresses 3 issues:
- SelectList.reset() didn't properly reset some of its members, though
  they're documented as needing to be reset. This was causing a crash
  when the Planner attempted to make an aggregation node for an agg
  function that had been eliminated by expr rewriting. While I'm here,
  I added resetting of all of SelectList's members that need to be
  reset, and fixed the documentation of one member that shouldn't be
  reset.
- SimplifyConditionalsRule was changing the meaning of queries that
  contain agg functions, e.g. because "select if(true, 0, sum(id))"
  is not equivalent to "select 0". The fix is to not return the
  simplfied expr if it removes all aggregates.
- ExprRewriteRulesTest was performing rewrites on the result exprs of
  the SelectStmt, which causes problems if the result exprs have been
  substituted. In normal query execution, we don't rewrite the result
  exprs anyway, so the fix is to match normal query execution and
  rewrite the select list exprs.

Testing:
- Added e2e test to exprs.test.
- Added unit test to ExprRewriteRulesTest.

Change-Id: Ic20b1621753980b47a612e0885804363b733f6da
Reviewed-on: http://gerrit.cloudera.org:8080/6653
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/915a1634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/915a1634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/915a1634

Branch: refs/heads/master
Commit: 915a16345c9325f29cad2a4c113d960e434b4ba7
Parents: c1463ff
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Fri Apr 14 12:36:46 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Apr 24 21:41:11 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/SelectStmt.java   | 13 ++++++++-----
 .../impala/rewrite/SimplifyConditionalsRule.java | 19 +++++++++++++++----
 .../impala/analysis/ExprRewriteRulesTest.java    | 11 ++++++++++-
 .../queries/QueryTest/exprs.test                 |  8 ++++++++
 4 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915a1634/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index e3f4e6a..c146247 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -68,10 +68,6 @@ public class SelectStmt extends QueryStmt {
   // set if we have AnalyticExprs in the select list/order by clause
   private AnalyticInfo analyticInfo_;
 
-  // SQL string of this SelectStmt before inline-view expression substitution.
-  // Set in analyze().
-  protected String sqlString_;
-
   // substitutes all exprs in this select block to reference base tables
   // directly
   private ExprSubstitutionMap baseTblSmap_ = new ExprSubstitutionMap();
@@ -79,6 +75,10 @@ public class SelectStmt extends QueryStmt {
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
+  // SQL string of this SelectStmt before inline-view expression substitution.
+  // Set in analyze().
+  protected String sqlString_;
+
   SelectStmt(SelectList selectList,
              FromClause fromClause,
              Expr wherePredicate, ArrayList<Expr> groupingExprs,
@@ -1029,10 +1029,13 @@ public class SelectStmt extends QueryStmt {
     selectList_.reset();
     colLabels_.clear();
     fromClause_.reset();
-    baseTblSmap_.clear();
     if (whereClause_ != null) whereClause_.reset();
     if (groupingExprs_ != null) Expr.resetList(groupingExprs_);
     if (havingClause_ != null) havingClause_.reset();
+    havingPred_ = null;
+    aggInfo_ = null;
+    analyticInfo_ = null;
+    baseTblSmap_.clear();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915a1634/fe/src/main/java/org/apache/impala/rewrite/SimplifyConditionalsRule.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/rewrite/SimplifyConditionalsRule.java b/fe/src/main/java/org/apache/impala/rewrite/SimplifyConditionalsRule.java
index 8c1de39..176b89e 100644
--- a/fe/src/main/java/org/apache/impala/rewrite/SimplifyConditionalsRule.java
+++ b/fe/src/main/java/org/apache/impala/rewrite/SimplifyConditionalsRule.java
@@ -51,14 +51,25 @@ public class SimplifyConditionalsRule implements ExprRewriteRule {
   public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
     if (!expr.isAnalyzed()) return expr;
 
+    Expr simplified;
     if (expr instanceof FunctionCallExpr) {
-      return simplifyFunctionCallExpr((FunctionCallExpr) expr);
+      simplified = simplifyFunctionCallExpr((FunctionCallExpr) expr);
     } else if (expr instanceof CompoundPredicate) {
-      return simplifyCompoundPredicate((CompoundPredicate) expr);
+      simplified = simplifyCompoundPredicate((CompoundPredicate) expr);
     } else if (expr instanceof CaseExpr) {
-      return simplifyCaseExpr((CaseExpr) expr, analyzer);
+      simplified = simplifyCaseExpr((CaseExpr) expr, analyzer);
+    } else {
+      return expr;
     }
-    return expr;
+
+    // IMPALA-5125: We can't eliminate aggregates as this may change the meaning of the
+    // query, for example:
+    // 'select if (true, 0, sum(id)) from alltypes' != 'select 0 from alltypes'
+    if (expr != simplified && expr.contains(Expr.isAggregatePredicate())
+        && !simplified.contains(Expr.isAggregatePredicate())) {
+      return expr;
+    }
+    return simplified;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915a1634/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
index 41dbeb8..3ee4141 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
@@ -51,7 +51,7 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
     SelectStmt stmt = (SelectStmt) ParsesOk(stmtStr);
     Analyzer analyzer = createAnalyzer(Catalog.DEFAULT_DB);
     stmt.analyze(analyzer);
-    Expr origExpr = stmt.getResultExprs().get(0);
+    Expr origExpr = stmt.getSelectList().getItems().get(0).getExpr();
     String origSql = origExpr.toSql();
     ExprRewriter rewriter = new ExprRewriter(rules);
     Expr rewrittenExpr = rewriter.rewrite(origExpr, analyzer);
@@ -324,6 +324,15 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
     RewritesOk("decode(id, null, 0, 1)", rules, null);
     // All non-constant, don't rewrite.
     RewritesOk("decode(id, 1, 1, 2, 2)", rules, null);
+
+    // IMPALA-5125: Exprs containing aggregates should not be rewritten if the rewrite
+    // eliminates all aggregates.
+    RewritesOk("if(true, 0, sum(id))", rule, null);
+    RewritesOk("if(false, max(id), min(id))", rule, "min(id)");
+    RewritesOk("true || sum(id) = 0", rule, null);
+    RewritesOk("case when true then 0 when false then sum(id) end", rule, null);
+    RewritesOk(
+        "case when true then count(id) when false then sum(id) end", rule, "count(id)");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915a1634/testdata/workloads/functional-query/queries/QueryTest/exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exprs.test b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
index fa3d225..d618a24 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exprs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
@@ -2733,4 +2733,12 @@ select distinct case when true then id else 0 end from functional.alltypestiny
 7
 ---- TYPES
 INT
+====
+---- QUERY
+# IMPALA-5125: test behavior when an agg function could be eliminated by expr rewrites.
+select if (true, 0, sum(id)) from functional.alltypestiny
+---- RESULTS
+0
+---- TYPES
+BIGINT
 ====
\ No newline at end of file


[10/10] incubator-impala git commit: IMPALA-4731/IMPALA-397/IMPALA-4728: Materialize sort exprs

Posted by kw...@apache.org.
IMPALA-4731/IMPALA-397/IMPALA-4728: Materialize sort exprs

Previously, exprs used in sorts were evaluated lazily. This can
potentially be bad for performance if the exprs are expensive to
evaluate, and it can lead to crashes if the exprs are
non-deterministic, as this violates assumptions of our sorting
algorithm.

This patch addresses these issues by materializing ordering exprs.
It does so when the expr is non-deterministic (including when it
contains a UDF, which we cannot currently know if they are
non-deterministic), or when its cost exceeds a threshold (or the
cost is unknown).

Testing:
- Added e2e tests in test_sort.py.
- Updated planner tests.

Change-Id: Ifefdaff8557a30ac44ea82ed428e6d1ffbca2e9e
Reviewed-on: http://gerrit.cloudera.org:8080/6322
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6cddb952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6cddb952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6cddb952

Branch: refs/heads/master
Commit: 6cddb952cefedd373b2a1ce71a1b3cff2e774d70
Parents: 42ca45e
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Jan 31 10:33:07 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 26 22:34:04 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/impala/analysis/Expr.java   |  13 +-
 .../impala/analysis/FunctionCallExpr.java       |   6 +-
 .../org/apache/impala/analysis/QueryStmt.java   |   6 +-
 .../org/apache/impala/analysis/SortInfo.java    | 107 +++++++++---
 .../apache/impala/planner/AnalyticPlanner.java  |   8 +-
 .../org/apache/impala/planner/SortNode.java     |  11 ++
 .../org/apache/impala/planner/PlannerTest.java  |   9 +
 .../queries/PlannerTest/constant-folding.test   |   1 +
 .../PlannerTest/sort-expr-materialization.test  | 169 +++++++++++++++++++
 tests/query_test/test_sort.py                   |  47 +++++-
 10 files changed, 340 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 1a1c92b..e28ab48 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -168,8 +168,17 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
       new com.google.common.base.Predicate<Expr>() {
         @Override
         public boolean apply(Expr arg) {
-          return arg instanceof FunctionCallExpr &&
-             !((FunctionCallExpr)arg).isNondeterministicBuiltinFn();
+          return arg instanceof FunctionCallExpr
+              && ((FunctionCallExpr) arg).isNondeterministicBuiltinFn();
+        }
+      };
+
+  public final static com.google.common.base.Predicate<Expr> IS_UDF_PREDICATE =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return arg instanceof FunctionCallExpr
+              && !((FunctionCallExpr) arg).getFnName().isBuiltin();
         }
       };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 1e06254..5895326 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -234,9 +234,9 @@ public class FunctionCallExpr extends Expr {
   static boolean isNondeterministicBuiltinFnName(String fnName) {
     if (fnName.equalsIgnoreCase("rand") || fnName.equalsIgnoreCase("random")
         || fnName.equalsIgnoreCase("uuid")) {
-      return false;
+      return true;
     }
-    return true;
+    return false;
   }
 
   /**
@@ -280,7 +280,7 @@ public class FunctionCallExpr extends Expr {
       fnName = path.get(path.size() - 1);
     }
     // Non-deterministic functions are never constant.
-    if (!isNondeterministicBuiltinFnName(fnName)) {
+    if (isNondeterministicBuiltinFnName(fnName)) {
       return false;
     }
     // Sleep is a special function for testing.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 135f2e4..69b9625 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -250,8 +250,10 @@ public abstract class QueryStmt extends StatementBase {
     ExprSubstitutionMap smap = sortInfo_.createSortTupleInfo(resultExprs_, analyzer);
 
     for (int i = 0; i < smap.size(); ++i) {
-      Preconditions.checkState(smap.getLhs().get(i) instanceof SlotRef);
-      Preconditions.checkState(smap.getRhs().get(i) instanceof SlotRef);
+      if (!(smap.getLhs().get(i) instanceof SlotRef)
+          || !(smap.getRhs().get(i) instanceof SlotRef)) {
+        continue;
+      }
       SlotRef inputSlotRef = (SlotRef) smap.getLhs().get(i);
       SlotRef outputSlotRef = (SlotRef) smap.getRhs().get(i);
       if (hasLimit()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 31f4d18..6c46231 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -18,7 +18,9 @@
 package org.apache.impala.analysis;
 import org.apache.impala.common.TreeNode;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
@@ -34,10 +36,22 @@ import com.google.common.collect.Sets;
  * particular input row (materialize all row slots)
  */
 public class SortInfo {
+  // All ordering exprs with cost greater than this will be materialized. Since we don't
+  // currently have any information about actual function costs, this value is intended to
+  // ensure that all expensive functions will be materialized while still leaving simple
+  // operations unmaterialized, for example 'SlotRef + SlotRef' should have a cost below
+  // this threshold.
+  // TODO: rethink this when we have a better cost model.
+  private static final float SORT_MATERIALIZATION_COST_THRESHOLD =
+      Expr.FUNCTION_CALL_COST;
+
   private List<Expr> orderingExprs_;
   private final List<Boolean> isAscOrder_;
   // True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
   private final List<Boolean> nullsFirstParams_;
+  // Subset of ordering exprs that are materialized. Populated in
+  // createMaterializedOrderExprs(), used for EXPLAIN output.
+  private List<Expr> materializedOrderingExprs_;
   // The single tuple that is materialized, sorted, and output by a sort operator
   // (i.e. SortNode or TopNNode)
   private TupleDescriptor sortTupleDesc_;
@@ -52,6 +66,7 @@ public class SortInfo {
     orderingExprs_ = orderingExprs;
     isAscOrder_ = isAscOrder;
     nullsFirstParams_ = nullsFirstParams;
+    materializedOrderingExprs_ = Lists.newArrayList();
   }
 
   /**
@@ -61,6 +76,7 @@ public class SortInfo {
     orderingExprs_ = Expr.cloneList(other.orderingExprs_);
     isAscOrder_ = Lists.newArrayList(other.isAscOrder_);
     nullsFirstParams_ = Lists.newArrayList(other.nullsFirstParams_);
+    materializedOrderingExprs_ = Expr.cloneList(other.materializedOrderingExprs_);
     sortTupleDesc_ = other.sortTupleDesc_;
     if (other.sortTupleSlotExprs_ != null) {
       sortTupleSlotExprs_ = Expr.cloneList(other.sortTupleSlotExprs_);
@@ -85,6 +101,7 @@ public class SortInfo {
   public List<Expr> getOrderingExprs() { return orderingExprs_; }
   public List<Boolean> getIsAscOrder() { return isAscOrder_; }
   public List<Boolean> getNullsFirstParams() { return nullsFirstParams_; }
+  public List<Expr> getMaterializedOrderingExprs() { return materializedOrderingExprs_; }
   public List<Expr> getSortTupleSlotExprs() { return sortTupleSlotExprs_; }
   public TupleDescriptor getSortTupleDescriptor() { return sortTupleDesc_; }
 
@@ -93,6 +110,7 @@ public class SortInfo {
    * of asc/desc.
    */
   public List<Boolean> getNullsFirst() {
+    Preconditions.checkState(orderingExprs_.size() == nullsFirstParams_.size());
     List<Boolean> nullsFirst = Lists.newArrayList();
     for (int i = 0; i < orderingExprs_.size(); ++i) {
       nullsFirst.add(OrderByElement.nullsFirst(nullsFirstParams_.get(i),
@@ -146,42 +164,51 @@ public class SortInfo {
 
   /**
    * Create a tuple descriptor for the single tuple that is materialized, sorted, and
-   * output by the sort node. Done by materializing slot refs in the order-by and given
-   * result expressions. Those slot refs in the ordering and result exprs are substituted
-   * with slot refs into the new tuple. This simplifies the sorting logic for total and
-   * top-n sorts. The substitution map is returned.
-   * TODO: We could do something more sophisticated than simply copying input slot refs -
-   * e.g. compute some order-by expressions.
+   * output by the sort node. Materializes slots required by 'resultExprs' as well as
+   * non-deterministic and expensive order by exprs. The materialized exprs are
+   * substituted with slot refs into the new tuple. This simplifies the sorting logic for
+   * total and top-n sorts. The substitution map is returned.
    */
   public ExprSubstitutionMap createSortTupleInfo(
       List<Expr> resultExprs, Analyzer analyzer) {
-    // sourceSlots contains the slots from the sort input to materialize.
-    Set<SlotRef> sourceSlots = Sets.newHashSet();
-
-    TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), sourceSlots);
-    TreeNode.collect(orderingExprs_, Predicates.instanceOf(SlotRef.class), sourceSlots);
-
     // The descriptor for the tuples on which the sort operates.
     TupleDescriptor sortTupleDesc = analyzer.getDescTbl().createTupleDescriptor("sort");
     sortTupleDesc.setIsMaterialized(true);
-
     List<Expr> sortTupleExprs = Lists.newArrayList();
 
-    // substOrderBy is the mapping from slot refs in the sort node's input to slot refs in
-    // the materialized sort tuple. Each slot ref in the input gets cloned and builds up
-    // the tuple operated on and returned by the sort node.
-    ExprSubstitutionMap substOrderBy = new ExprSubstitutionMap();
+    // substOrderBy is a mapping from exprs evaluated on the sort input that get
+    // materialized into the sort tuple to their corresponding SlotRefs in the sort tuple.
+    // The following exprs are materialized:
+    // 1. Ordering exprs that we chose to materialize
+    // 2. SlotRefs against the sort input contained in the result and ordering exprs after
+    // substituting the materialized ordering exprs.
+
+    // Case 1:
+    ExprSubstitutionMap substOrderBy =
+        createMaterializedOrderExprs(sortTupleDesc, analyzer);
+    sortTupleExprs.addAll(substOrderBy.getLhs());
+
+    // Case 2: SlotRefs in the result and ordering exprs after substituting the
+    // materialized ordering exprs.
+    Set<SlotRef> sourceSlots = Sets.newHashSet();
+    TreeNode.collect(Expr.substituteList(resultExprs, substOrderBy, analyzer, false),
+        Predicates.instanceOf(SlotRef.class), sourceSlots);
+    TreeNode.collect(Expr.substituteList(orderingExprs_, substOrderBy, analyzer, false),
+        Predicates.instanceOf(SlotRef.class), sourceSlots);
     for (SlotRef origSlotRef: sourceSlots) {
-      SlotDescriptor origSlotDesc = origSlotRef.getDesc();
-      SlotDescriptor materializedDesc =
-          analyzer.copySlotDescriptor(origSlotDesc, sortTupleDesc);
-      SlotRef cloneRef = new SlotRef(materializedDesc);
-      substOrderBy.put(origSlotRef, cloneRef);
-      sortTupleExprs.add(origSlotRef);
+      // Don't rematerialize slots that are already in the sort tuple.
+      if (origSlotRef.getDesc().getParent().getId() != sortTupleDesc.getId()) {
+        SlotDescriptor origSlotDesc = origSlotRef.getDesc();
+        SlotDescriptor materializedDesc =
+            analyzer.copySlotDescriptor(origSlotDesc, sortTupleDesc);
+        SlotRef cloneRef = new SlotRef(materializedDesc);
+        substOrderBy.put(origSlotRef, cloneRef);
+        sortTupleExprs.add(origSlotRef);
+      }
     }
 
-    // The ordering exprs still point to the old slot refs and need to be replaced with
-    // ones that point to the slot refs into the sort's output tuple.
+    // The ordering exprs are evaluated against the sort tuple, so they must reflect the
+    // materialization decision above.
     substituteOrderingExprs(substOrderBy, analyzer);
 
     // Update the tuple descriptor used to materialize the input of the sort.
@@ -189,4 +216,34 @@ public class SortInfo {
 
     return substOrderBy;
   }
+
+  /**
+   * Materialize ordering exprs by creating slots for them in 'sortTupleDesc' if they:
+   * - contain a non-deterministic expr
+   * - contain a UDF (since we don't know if they're deterministic)
+   * - are more expensive than a cost threshold
+   * - don't have a cost set
+   *
+   * Populates 'materializedOrderingExprs_' and returns a mapping from the original
+   * ordering exprs to the new SlotRefs. It is expected that this smap will be passed into
+   * substituteOrderingExprs() by the caller.
+   */
+  public ExprSubstitutionMap createMaterializedOrderExprs(
+      TupleDescriptor sortTupleDesc, Analyzer analyzer) {
+    ExprSubstitutionMap substOrderBy = new ExprSubstitutionMap();
+    for (Expr origOrderingExpr : orderingExprs_) {
+      if (!origOrderingExpr.hasCost()
+          || origOrderingExpr.getCost() > SORT_MATERIALIZATION_COST_THRESHOLD
+          || origOrderingExpr.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)
+          || origOrderingExpr.contains(Expr.IS_UDF_PREDICATE)) {
+        SlotDescriptor materializedDesc = analyzer.addSlotDescriptor(sortTupleDesc);
+        materializedDesc.initFromExpr(origOrderingExpr);
+        materializedDesc.setIsMaterialized(true);
+        SlotRef materializedRef = new SlotRef(materializedDesc);
+        substOrderBy.put(origOrderingExpr, materializedRef);
+        materializedOrderingExprs_.add(origOrderingExpr);
+      }
+    }
+    return substOrderBy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 6d726ec..08dd9f5 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -308,8 +308,12 @@ public class AnalyticPlanner {
       }
     }
 
-    SortInfo sortInfo = new SortInfo(
-        Expr.substituteList(sortExprs, sortSmap, analyzer_, false), isAsc, nullsFirst);
+    SortInfo sortInfo = new SortInfo(sortExprs, isAsc, nullsFirst);
+    ExprSubstitutionMap smap =
+        sortInfo.createMaterializedOrderExprs(sortTupleDesc, analyzer_);
+    sortSlotExprs.addAll(smap.getLhs());
+    sortSmap = ExprSubstitutionMap.combine(sortSmap, smap);
+    sortInfo.substituteOrderingExprs(sortSmap, analyzer_);
     if (LOG.isTraceEnabled()) {
       LOG.trace("sortinfo exprs: " + Expr.debugString(sortInfo.getOrderingExprs()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index ef05499..177565a 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -187,6 +187,17 @@ public class SortNode extends PlanNode {
       }
       output.append("\n");
     }
+
+    if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()
+        && info_.getMaterializedOrderingExprs().size() > 0) {
+      output.append(detailPrefix + "materialized: ");
+      for (int i = 0; i < info_.getMaterializedOrderingExprs().size(); ++i) {
+        if (i > 0) output.append(", ");
+        output.append(info_.getMaterializedOrderingExprs().get(i).toSql());
+      }
+      output.append("\n");
+    }
+
     return output.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 363c59c..80ba3b2 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -19,6 +19,7 @@ package org.apache.impala.planner;
 
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.testutil.TestUtils;
@@ -32,6 +33,7 @@ import org.junit.Assume;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 // All planner tests, except for S3 specific tests should go here.
 public class PlannerTest extends PlannerTestBase {
@@ -379,4 +381,11 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("resource-requirements", options, false);
   }
 
+  @Test
+  public void testSortExprMaterialization() {
+    addTestFunction("TestFn", Lists.newArrayList(Type.DOUBLE), false);
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("sort-expr-materialization", options);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 7effd9b..0a30a97 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -261,6 +261,7 @@ PLAN-ROOT SINK
 |
 01:SORT
 |  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC
+|  materialized: concat('ab', string_col), greatest(20, bigint_col)
 |  mem-estimate=16.00MB mem-reservation=48.00MB
 |  tuple-ids=3 row-size=29B cardinality=7300
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
new file mode 100644
index 0000000..b7e22a9
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
@@ -0,0 +1,169 @@
+# sort on a non-deterministic expr, gets materialized
+select * from functional.alltypes order by random()
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SORT
+|  order by: random() ASC
+|  materialized: random()
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=1 row-size=105B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=97B cardinality=7300
+====
+# sort on a deterministic expr that exceeds the cost threshold
+select * from functional.alltypes order by abs(id) + abs(id)
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SORT
+|  order by: abs(id) + abs(id) ASC
+|  materialized: abs(id) + abs(id)
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=1 row-size=105B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=97B cardinality=7300
+====
+# sort on a deterministic expr that doesn't exceed the cost threshold
+select * from functional.alltypes order by tinyint_col + 1
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SORT
+|  order by: tinyint_col + 1 ASC
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=1 row-size=97B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=97B cardinality=7300
+====
+# sort on multiple exprs, subset is materialized
+select * from functional.alltypes
+order by dayofweek(timestamp_col), true, id + 1, string_col = date_string_col, id = tinyint_col
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SORT
+|  order by: dayofweek(timestamp_col) ASC, TRUE ASC, id + 1 ASC, string_col = date_string_col ASC, id = tinyint_col ASC
+|  materialized: dayofweek(timestamp_col), string_col = date_string_col
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=1 row-size=102B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=97B cardinality=7300
+====
+# expensive analytic order by expr gets materialized
+select last_value(id) over (order by to_date(timestamp_col), bool_col is null)
+from functional.alltypes
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+02:ANALYTIC
+|  functions: last_value(id)
+|  order by: to_date(timestamp_col) ASC, bool_col IS NULL ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=0B mem-reservation=16.00MB
+|  tuple-ids=3,2 row-size=41B cardinality=7300
+|
+01:SORT
+|  order by: to_date(timestamp_col) ASC, bool_col IS NULL ASC
+|  materialized: to_date(timestamp_col)
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=3 row-size=37B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=21B cardinality=7300
+====
+# expensive order by expr in top-n gets materialized
+select id from functional.alltypes order by string_col like 'a.*b', id * bigint_col,
+regexp_replace(string_col, 'a.*b', 'c') limit 10
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:TOP-N [LIMIT=10]
+|  order by: string_col LIKE 'a.*b' ASC, id * bigint_col ASC, regexp_replace(string_col, 'a.*b', 'c') ASC
+|  materialized: string_col LIKE 'a.*b', regexp_replace(string_col, 'a.*b', 'c')
+|  mem-estimate=290B mem-reservation=0B
+|  tuple-ids=1 row-size=29B cardinality=10
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=29B cardinality=7300
+====
+# sort on udf, gets materialized
+select * from functional.alltypes order by TestFn(double_col)
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SORT
+|  order by: default.testfn(double_col) ASC
+|  materialized: default.testfn(double_col)
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=1 row-size=101B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=97B cardinality=7300
+====
+# sort expr contains SlotRefs that don't need to be materialized separately
+select concat(date_string_col, string_col) c from functional.alltypes order by c
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SORT
+|  order by: concat(date_string_col, string_col) ASC
+|  materialized: concat(date_string_col, string_col)
+|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  tuple-ids=1 row-size=16B cardinality=7300
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   table stats: 7300 rows total
+   column stats: all
+   mem-estimate=128.00MB mem-reservation=0B
+   tuple-ids=0 row-size=41B cardinality=7300
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cddb952/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index 228d25d..1b77b6f 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -17,11 +17,12 @@
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 
-def transpose_results(result):
+def transpose_results(result, map_fn=lambda x: x):
   """Given a query result (list of strings, each string represents a row), return a list
-    of columns, where each column is a list of strings."""
+    of columns, where each column is a list of strings. Optionally, map_fn can be provided
+    to be applied to every value, eg. to convert the strings to their underlying types."""
   split_result = [row.split('\t') for row in result]
-  return [list(l) for l in zip(*split_result)]
+  return [map(map_fn, list(l)) for l in zip(*split_result)]
 
 class TestQueryFullSort(ImpalaTestSuite):
   """Test class to do functional validation of sorting when data is spilled to disk."""
@@ -154,3 +155,43 @@ class TestQueryFullSort(ImpalaTestSuite):
       query, exec_option, table_format=table_format).data)
     assert(result[0] == sorted(result[0]))
 
+class TestRandomSort(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional'
+
+  def test_order_by_random(self):
+    """Tests that 'order by random()' works as expected."""
+    # "order by random()" with different seeds should produce different orderings.
+    seed_query = "select * from functional.alltypestiny order by random(%s)"
+    results_seed0 = self.execute_query(seed_query % "0")
+    results_seed1 = self.execute_query(seed_query % "1")
+    assert results_seed0.data != results_seed1.data
+    assert sorted(results_seed0.data) == sorted(results_seed1.data)
+
+    # Include "random()" in the select list to check that it's sorted correctly.
+    results = transpose_results(self.execute_query(
+        "select random() as r from functional.alltypessmall order by r").data,
+        lambda x: float(x))
+    assert(results[0] == sorted(results[0]))
+
+    # Like above, but with a limit.
+    results = transpose_results(self.execute_query(
+        "select random() as r from functional.alltypes order by r limit 100").data,
+        lambda x: float(x))
+    assert(results == sorted(results))
+
+    # "order by random()" inside an inline view.
+    query = "select r from (select random() r from functional.alltypessmall) v order by r"
+    results = transpose_results(self.execute_query(query).data, lambda x: float(x))
+    assert (results == sorted(results))
+
+  def test_analytic_order_by_random(self):
+    """Tests that a window function over 'order by random()' works as expected."""
+    # Since we use the same random seed and a very small table, the following queries
+    # should be equivalent.
+    results = transpose_results(self.execute_query("select id from "
+        "functional.alltypestiny order by random(2)").data)
+    analytic_results = transpose_results(self.execute_query("select last_value(id) over "
+        "(order by random(2)) from functional.alltypestiny").data)
+    assert results == analytic_results


[07/10] incubator-impala git commit: IMPALA-5147: Add the ability to exclude hosts from query execution

Posted by kw...@apache.org.
IMPALA-5147: Add the ability to exclude hosts from query execution

This commit introduces a new startup option, termed 'is_executor',
that determines whether an impalad process can execute query fragments.
The 'is_executor' option determines if a specific host will be included
in the scheduler's backend configuration and hence included in
scheduling decisions.

Testing:
- Added a customer cluster test.
- Added a new scheduler test.

Change-Id: I5d2ff7f341c9d2b0649e4d14561077e166ad7c4d
Reviewed-on: http://gerrit.cloudera.org:8080/6628
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e2c53a8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e2c53a8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e2c53a8b

Branch: refs/heads/master
Commit: e2c53a8bdf646331b29c3de921b681d0d885c82e
Parents: 5809317
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Apr 13 11:18:47 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 26 01:45:40 2017 +0000

----------------------------------------------------------------------
 be/src/scheduling/scheduler-test-util.cc  |  13 +-
 be/src/scheduling/scheduler-test-util.h   |  15 +-
 be/src/scheduling/scheduler-test.cc       |  21 ++
 be/src/scheduling/scheduler.cc            | 285 ++++++++++++-------------
 be/src/scheduling/scheduler.h             | 220 ++++++++++---------
 be/src/service/impala-http-handler.cc     |  19 ++
 be/src/service/impala-http-handler.h      |  10 +
 be/src/service/impala-server.cc           |  24 ++-
 be/src/service/impala-server.h            |  28 ++-
 be/src/util/network-util.cc               |   2 +
 be/src/util/webserver.cc                  |  10 +-
 bin/start-impala-cluster.py               |  27 ++-
 common/thrift/StatestoreService.thrift    |  13 +-
 tests/common/custom_cluster_test_suite.py |   7 +-
 tests/custom_cluster/test_coordinators.py |  34 +++
 www/backends.tmpl                         |   8 +-
 www/root.tmpl                             |   3 +-
 17 files changed, 432 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 782379c..7547782 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -58,14 +58,14 @@ const string Cluster::IP_PREFIX = "10";
 /// Default size for new blocks is 1MB.
 const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
 
-int Cluster::AddHost(bool has_backend, bool has_datanode) {
+int Cluster::AddHost(bool has_backend, bool has_datanode, bool is_executor) {
   int host_idx = hosts_.size();
   int be_port = has_backend ? BACKEND_PORT : -1;
   int dn_port = has_datanode ? DATANODE_PORT : -1;
   IpAddr ip = HostIdxToIpAddr(host_idx);
   DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
   ip_to_idx_[ip] = host_idx;
-  hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
+  hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port, is_executor));
   // Add host to lists of backend indexes per type.
   if (has_backend) backend_host_idxs_.push_back(host_idx);
   if (has_datanode) {
@@ -79,8 +79,9 @@ int Cluster::AddHost(bool has_backend, bool has_datanode) {
   return host_idx;
 }
 
-void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
-  for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
+void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode,
+    bool is_executor) {
+  for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode, is_executor);
 }
 
 Hostname Cluster::HostIdxToHostname(int host_idx) {
@@ -454,7 +455,7 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
 
   // Compute Assignment.
   FragmentScanRangeAssignment* assignment = result->AddAssignment();
-  return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0,
+  return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetExecutorsConfig(), 0,
       nullptr, false, plan_.scan_range_locations(), plan_.referenced_datanodes(),
       exec_at_coord, plan_.query_options(), nullptr, assignment);
 }
@@ -519,6 +520,8 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta)
   be_desc.address.hostname = host.ip;
   be_desc.address.port = host.be_port;
   be_desc.ip_address = host.ip;
+  be_desc.__set_is_coordinator(host.is_coordinator);
+  be_desc.__set_is_executor(host.is_executor);
 
   // Build topic item.
   TTopicItem item;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h
index 6cce021..c3d6b1c 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -86,13 +86,17 @@ enum class ReplicaPlacement {
 
 /// Host model. Each host can have either a backend, a datanode, or both. To specify that
 /// a host should not act as a backend or datanode specify '-1' as the respective port.
+/// A host with a backend is always a coordinator but it may not be an executor.
 struct Host {
-  Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
-    : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
+  Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port, bool is_executor)
+    : name(name), ip(ip), be_port(be_port), dn_port(dn_port), is_coordinator(true),
+      is_executor(is_executor) {}
   Hostname name;
   IpAddr ip;
   int be_port; // Backend port
   int dn_port; // Datanode port
+  bool is_coordinator; // True if this is a coordinator host
+  bool is_executor; // True if this is an executor host
 };
 
 /// A cluster stores a list of hosts and provides various methods to add hosts to the
@@ -101,10 +105,13 @@ class Cluster {
  public:
   /// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be
   /// generated and are guaranteed to be unique.
-  int AddHost(bool has_backend, bool has_datanode);
+  /// TODO: Refactor the construction of a host and its addition to a cluster to
+  /// avoid the boolean input parameters.
+  int AddHost(bool has_backend, bool has_datanode, bool is_executor = true);
 
   /// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
-  void AddHosts(int num_hosts, bool has_backend, bool has_datanode);
+  void AddHosts(int num_hosts, bool has_backend, bool has_datanode,
+      bool is_executor = true);
 
   /// Convert a host index to a hostname.
   static Hostname HostIdxToHostname(int host_idx);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 3e05c5b..c7b284b 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -52,6 +52,27 @@ TEST_F(SchedulerTest, SingleHostSingleFile) {
   EXPECT_EQ(0, result.NumCachedAssignments());
 }
 
+/// Test cluster configuration with one coordinator that can't process scan ranges.
+TEST_F(SchedulerTest, SingleCoordinatorNoExecutor) {
+  Cluster cluster;
+  cluster.AddHost(true, true, false);
+  cluster.AddHost(true, true, true);
+  cluster.AddHost(true, true, true);
+
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::LOCAL_ONLY, 3);
+
+  Plan plan(schema);
+  plan.AddTableScan("T1");
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  scheduler.Compute(&result);
+
+  EXPECT_EQ(2, result.NumDistinctBackends());
+  EXPECT_EQ(0, result.NumDiskAssignments(0));
+}
+
 /// Test assigning all scan ranges to the coordinator.
 TEST_F(SchedulerTest, ExecAtCoord) {
   Cluster cluster;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index a73e13a..bca5965 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -29,7 +29,6 @@
 #include "common/logging.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
-#include "rapidjson/rapidjson.h"
 #include "runtime/exec-env.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/container-util.h"
@@ -41,7 +40,6 @@
 
 using boost::algorithm::join;
 using namespace apache::thrift;
-using namespace rapidjson;
 using namespace strings;
 
 DECLARE_int32(be_port);
@@ -54,15 +52,12 @@ static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 
-static const string BACKENDS_WEB_PAGE = "/backends";
-static const string BACKENDS_TEMPLATE = "backends.tmpl";
-
 const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
 
 Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
     const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver,
     RequestPoolService* request_pool_service)
-  : backend_config_(std::make_shared<const BackendConfig>()),
+  : executors_config_(std::make_shared<const BackendConfig>()),
     metrics_(metrics->GetOrCreateChildGroup("scheduler")),
     webserver_(webserver),
     statestore_subscriber_(subscriber),
@@ -77,7 +72,7 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
 
 Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metrics,
     Webserver* webserver, RequestPoolService* request_pool_service)
-  : backend_config_(std::make_shared<const BackendConfig>(backends)),
+  : executors_config_(std::make_shared<const BackendConfig>(backends)),
     metrics_(metrics),
     webserver_(webserver),
     statestore_subscriber_(nullptr),
@@ -109,13 +104,6 @@ Status Scheduler::Init() {
 
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
-  if (webserver_ != nullptr) {
-    Webserver::UrlCallback backends_callback =
-        bind<void>(mem_fn(&Scheduler::BackendsUrlCallback), this, _1, _2);
-    webserver_->RegisterUrlCallback(
-        BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE, backends_callback);
-  }
-
   if (statestore_subscriber_ != nullptr) {
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
@@ -128,8 +116,8 @@ Status Scheduler::Init() {
 
   if (metrics_ != nullptr) {
     // This is after registering with the statestored, so we already have to synchronize
-    // access to the backend_config_ shared_ptr.
-    int num_backends = GetBackendConfig()->NumBackends();
+    // access to the executors_config_ shared_ptr.
+    int num_backends = GetExecutorsConfig()->NumBackends();
     total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
     total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
@@ -152,20 +140,6 @@ Status Scheduler::Init() {
   return Status::OK();
 }
 
-void Scheduler::BackendsUrlCallback(
-    const Webserver::ArgumentMap& args, Document* document) {
-  BackendConfig::BackendList backends;
-  BackendConfigPtr backend_config = GetBackendConfig();
-  backend_config->GetAllBackends(&backends);
-  Value backends_list(kArrayType);
-  for (const TBackendDescriptor& backend : backends) {
-    Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator());
-    backends_list.PushBack(str, document->GetAllocator());
-  }
-
-  document->AddMember("backends", backends_list, document->GetAllocator());
-}
-
 void Scheduler::UpdateMembership(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
@@ -177,26 +151,29 @@ void Scheduler::UpdateMembership(
   const TTopicDelta& delta = topic->second;
 
   // If the delta transmitted by the statestore is empty we can skip processing
-  // altogether and avoid making a copy of backend_config_.
+  // altogether and avoid making a copy of executors_config_.
   if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) {
     return;
   }
 
   // This function needs to handle both delta and non-delta updates. To minimize the
-  // time needed to hold locks, all updates are applied to a copy of backend_config_,
+  // time needed to hold locks, all updates are applied to a copy of
+  // executors_config_,
   // which is then swapped into place atomically.
-  std::shared_ptr<BackendConfig> new_backend_config;
+  std::shared_ptr<BackendConfig> new_executors_config;
 
   if (!delta.is_delta) {
-    current_membership_.clear();
-    new_backend_config = std::make_shared<BackendConfig>();
+    current_executors_.clear();
+    new_executors_config = std::make_shared<BackendConfig>();
   } else {
     // Make a copy
-    lock_guard<mutex> lock(backend_config_lock_);
-    new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
+    lock_guard<mutex> lock(executors_config_lock_);
+    new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new entries to the topic
+  // Process new entries to the topic. Update executors_config_ and
+  // current_executors_ to match the set of executors given by the
+  // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
@@ -225,42 +202,44 @@ void Scheduler::UpdateMembership(
                                << be_desc.address;
       continue;
     }
-    new_backend_config->AddBackend(be_desc);
-    current_membership_.insert(make_pair(item.key, be_desc));
+    if (be_desc.is_executor) {
+      new_executors_config->AddBackend(be_desc);
+      current_executors_.insert(make_pair(item.key, be_desc));
+    }
   }
 
   // Process deletions from the topic
   for (const string& backend_id : delta.topic_deletions) {
-    if (current_membership_.find(backend_id) != current_membership_.end()) {
-      new_backend_config->RemoveBackend(current_membership_[backend_id]);
-      current_membership_.erase(backend_id);
+    if (current_executors_.find(backend_id) != current_executors_.end()) {
+      new_executors_config->RemoveBackend(current_executors_[backend_id]);
+      current_executors_.erase(backend_id);
     }
   }
 
-  SetBackendConfig(new_backend_config);
+  SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {
     /// TODO-MT: fix this (do we even need to report it?)
-    num_fragment_instances_metric_->set_value(current_membership_.size());
+    num_fragment_instances_metric_->set_value(current_executors_.size());
   }
 }
 
-Scheduler::BackendConfigPtr Scheduler::GetBackendConfig() const {
-  lock_guard<mutex> l(backend_config_lock_);
-  DCHECK(backend_config_.get() != nullptr);
-  BackendConfigPtr backend_config = backend_config_;
-  return backend_config;
+Scheduler::ExecutorsConfigPtr Scheduler::GetExecutorsConfig() const {
+  lock_guard<mutex> l(executors_config_lock_);
+  DCHECK(executors_config_.get() != nullptr);
+  ExecutorsConfigPtr executor_config = executors_config_;
+  return executor_config;
 }
 
-void Scheduler::SetBackendConfig(const BackendConfigPtr& backend_config) {
-  lock_guard<mutex> l(backend_config_lock_);
-  backend_config_ = backend_config;
+void Scheduler::SetExecutorsConfig(const ExecutorsConfigPtr& executors_config) {
+  lock_guard<mutex> l(executors_config_lock_);
+  executors_config_ = executors_config;
 }
 
 Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) {
   RuntimeProfile::Counter* total_assignment_timer =
       ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
-  BackendConfigPtr backend_config = GetBackendConfig();
+  ExecutorsConfigPtr executor_config = GetExecutorsConfig();
   const TQueryExecRequest& exec_request = schedule->request();
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
@@ -283,7 +262,7 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) {
       FragmentScanRangeAssignment* assignment =
           &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
       RETURN_IF_ERROR(
-          ComputeScanRangeAssignment(*backend_config, node_id, node_replica_preference,
+          ComputeScanRangeAssignment(*executor_config, node_id, node_replica_preference,
               node_random_replica, entry.second, exec_request.host_list, exec_at_coord,
               schedule->query_options(), total_assignment_timer, assignment));
       schedule->IncNumScanRanges(entry.second.size());
@@ -518,13 +497,13 @@ void Scheduler::CreateCollocatedInstances(
   }
 }
 
-Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config,
+Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_config,
     PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
     const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
     FragmentScanRangeAssignment* assignment) {
-  if (backend_config.NumBackends() == 0 && !exec_at_coord) {
+  if (executor_config.NumBackends() == 0 && !exec_at_coord) {
     return Status(TErrorCode::NO_REGISTERED_BACKENDS);
   }
 
@@ -544,40 +523,40 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config
   // A preference attached to the plan node takes precedence.
   if (node_replica_preference) base_distance = *node_replica_preference;
 
-  // Between otherwise equivalent backends we optionally break ties by comparing their
+  // Between otherwise equivalent executors we optionally break ties by comparing their
   // random rank.
   bool random_replica = query_options.schedule_random_replica || node_random_replica;
 
   AssignmentCtx assignment_ctx(
-      exec_at_coord ? coord_only_backend_config_ : backend_config, total_assignments_,
+      exec_at_coord ? coord_only_backend_config_ : executor_config, total_assignments_,
       total_local_assignments_);
 
   vector<const TScanRangeLocationList*> remote_scan_range_locations;
 
-  // Loop over all scan ranges, select a backend for those with local impalads and collect
-  // all others for later processing.
+  // Loop over all scan ranges, select an executor for those with local impalads and
+  // collect all others for later processing.
   for (const TScanRangeLocationList& scan_range_locations : locations) {
     TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
 
-    // Select backend host for the current scan range.
+    // Select executor for the current scan range.
     if (exec_at_coord) {
-      DCHECK(assignment_ctx.backend_config().LookUpBackendIp(
+      DCHECK(assignment_ctx.executor_config().LookUpBackendIp(
           local_backend_descriptor_.address.hostname, nullptr));
       assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id,
           host_list, scan_range_locations, assignment);
     } else {
-      // Collect backend candidates with smallest memory distance.
-      vector<IpAddr> backend_candidates;
+      // Collect executor candidates with smallest memory distance.
+      vector<IpAddr> executor_candidates;
       if (base_distance < TReplicaPreference::REMOTE) {
         for (const TScanRangeLocation& location : scan_range_locations.locations) {
           const TNetworkAddress& replica_host = host_list[location.host_idx];
-          // Determine the adjusted memory distance to the closest backend for the replica
-          // host.
+          // Determine the adjusted memory distance to the closest executor for the
+          // replica host.
           TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
-          IpAddr backend_ip;
-          bool has_local_backend = assignment_ctx.backend_config().LookUpBackendIp(
-              replica_host.hostname, &backend_ip);
-          if (has_local_backend) {
+          IpAddr executor_ip;
+          bool has_local_executor = assignment_ctx.executor_config().LookUpBackendIp(
+              replica_host.hostname, &executor_ip);
+          if (has_local_executor) {
             if (location.is_cached) {
               memory_distance = TReplicaPreference::CACHE_LOCAL;
             } else {
@@ -588,58 +567,58 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config
           }
           memory_distance = max(memory_distance, base_distance);
 
-          // We only need to collect backend candidates for non-remote reads, as it is the
-          // nature of remote reads that there is no backend available.
+          // We only need to collect executor candidates for non-remote reads, as it is
+          // the nature of remote reads that there is no executor available.
           if (memory_distance < TReplicaPreference::REMOTE) {
-            DCHECK(has_local_backend);
+            DCHECK(has_local_executor);
             // Check if we found a closer replica than the previous ones.
             if (memory_distance < min_distance) {
               min_distance = memory_distance;
-              backend_candidates.clear();
-              backend_candidates.push_back(backend_ip);
+              executor_candidates.clear();
+              executor_candidates.push_back(executor_ip);
             } else if (memory_distance == min_distance) {
-              backend_candidates.push_back(backend_ip);
+              executor_candidates.push_back(executor_ip);
             }
           }
         }
       } // End of candidate selection.
-      DCHECK(!backend_candidates.empty() || min_distance == TReplicaPreference::REMOTE);
+      DCHECK(!executor_candidates.empty() || min_distance == TReplicaPreference::REMOTE);
 
       // Check the effective memory distance of the candidates to decide whether to treat
       // the scan range as cached.
       bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL;
 
-      // Pick backend host based on data location.
-      bool local_backend = min_distance != TReplicaPreference::REMOTE;
+      // Pick executor based on data location.
+      bool local_executor = min_distance != TReplicaPreference::REMOTE;
 
-      if (!local_backend) {
+      if (!local_executor) {
         remote_scan_range_locations.push_back(&scan_range_locations);
         continue;
       }
-      // For local reads we want to break ties by backend rank in these cases:
+      // For local reads we want to break ties by executor rank in these cases:
       // - if it is enforced via a query option.
       // - when selecting between cached replicas. In this case there is no OS buffer
       //   cache to worry about.
-      // Remote reads will always break ties by backend rank.
+      // Remote reads will always break ties by executor rank.
       bool decide_local_assignment_by_rank = random_replica || cached_replica;
-      const IpAddr* backend_ip = nullptr;
-      backend_ip = assignment_ctx.SelectLocalBackendHost(
-          backend_candidates, decide_local_assignment_by_rank);
-      TBackendDescriptor backend;
-      assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+      const IpAddr* executor_ip = nullptr;
+      executor_ip = assignment_ctx.SelectLocalExecutor(
+          executor_candidates, decide_local_assignment_by_rank);
+      TBackendDescriptor executor;
+      assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
       assignment_ctx.RecordScanRangeAssignment(
-          backend, node_id, host_list, scan_range_locations, assignment);
-    } // End of backend host selection.
+          executor, node_id, host_list, scan_range_locations, assignment);
+    } // End of executor selection.
   } // End of for loop over scan ranges.
 
-  // Assign remote scans to backends.
+  // Assign remote scans to executors.
   for (const TScanRangeLocationList* scan_range_locations : remote_scan_range_locations) {
     DCHECK(!exec_at_coord);
-    const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
-    TBackendDescriptor backend;
-    assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+    const IpAddr* executor_ip = assignment_ctx.SelectRemoteExecutor();
+    TBackendDescriptor executor;
+    assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
     assignment_ctx.RecordScanRangeAssignment(
-        backend, node_id, host_list, *scan_range_locations, assignment);
+        executor, node_id, host_list, *scan_range_locations, assignment);
   }
 
   if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment);
@@ -736,22 +715,22 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
   return Status::OK();
 }
 
-Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& backend_config,
+Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config,
     IntCounter* total_assignments, IntCounter* total_local_assignments)
-  : backend_config_(backend_config),
-    first_unused_backend_idx_(0),
+  : executors_config_(executor_config),
+    first_unused_executor_idx_(0),
     total_assignments_(total_assignments),
     total_local_assignments_(total_local_assignments) {
-  DCHECK_GT(backend_config.NumBackends(), 0);
-  backend_config.GetAllBackendIps(&random_backend_order_);
+  DCHECK_GT(executor_config.NumBackends(), 0);
+  executor_config.GetAllBackendIps(&random_executor_order_);
   std::mt19937 g(rand());
-  std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
-  // Initialize inverted map for backend rank lookups
+  std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), g);
+  // Initialize inverted map for executor rank lookups
   int i = 0;
-  for (const IpAddr& ip : random_backend_order_) random_backend_rank_[ip] = i++;
+  for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = i++;
 }
 
-const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost(
+const IpAddr* Scheduler::AssignmentCtx::SelectLocalExecutor(
     const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
   DCHECK(!data_locations.empty());
   // List of candidate indexes into 'data_locations'.
@@ -759,9 +738,9 @@ const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost(
   // Find locations with minimum number of assigned bytes.
   int64_t min_assigned_bytes = numeric_limits<int64_t>::max();
   for (int i = 0; i < data_locations.size(); ++i) {
-    const IpAddr& backend_ip = data_locations[i];
+    const IpAddr& executor_ip = data_locations[i];
     int64_t assigned_bytes = 0;
-    auto handle_it = assignment_heap_.find(backend_ip);
+    auto handle_it = assignment_heap_.find(executor_ip);
     if (handle_it != assignment_heap_.end()) {
       assigned_bytes = (*handle_it->second).assigned_bytes;
     }
@@ -777,69 +756,69 @@ const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost(
   if (break_ties_by_rank) {
     min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(),
         [&data_locations, this](const int& a, const int& b) {
-          return GetBackendRank(data_locations[a]) < GetBackendRank(data_locations[b]);
+          return GetExecutorRank(data_locations[a]) < GetExecutorRank(data_locations[b]);
         });
   }
   return &data_locations[*min_rank_idx];
 }
 
-const IpAddr* Scheduler::AssignmentCtx::SelectRemoteBackendHost() {
+const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() {
   const IpAddr* candidate_ip;
-  if (HasUnusedBackends()) {
-    // Pick next unused backend.
-    candidate_ip = GetNextUnusedBackendAndIncrement();
+  if (HasUnusedExecutors()) {
+    // Pick next unused executor.
+    candidate_ip = GetNextUnusedExecutorAndIncrement();
   } else {
-    // Pick next backend from assignment_heap. All backends must have been inserted into
+    // Pick next executor from assignment_heap. All executors must have been inserted into
     // the heap at this point.
-    DCHECK_GT(backend_config_.NumBackends(), 0);
-    DCHECK_EQ(backend_config_.NumBackends(), assignment_heap_.size());
+    DCHECK_GT(executors_config_.NumBackends(), 0);
+    DCHECK_EQ(executors_config_.NumBackends(), assignment_heap_.size());
     candidate_ip = &(assignment_heap_.top().ip);
   }
   DCHECK(candidate_ip != nullptr);
   return candidate_ip;
 }
 
-bool Scheduler::AssignmentCtx::HasUnusedBackends() const {
-  return first_unused_backend_idx_ < random_backend_order_.size();
+bool Scheduler::AssignmentCtx::HasUnusedExecutors() const {
+  return first_unused_executor_idx_ < random_executor_order_.size();
 }
 
-const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
-  DCHECK(HasUnusedBackends());
-  const IpAddr* ip = &random_backend_order_[first_unused_backend_idx_++];
+const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedExecutorAndIncrement() {
+  DCHECK(HasUnusedExecutors());
+  const IpAddr* ip = &random_executor_order_[first_unused_executor_idx_++];
   return ip;
 }
 
-int Scheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
-  auto it = random_backend_rank_.find(ip);
-  DCHECK(it != random_backend_rank_.end());
+int Scheduler::AssignmentCtx::GetExecutorRank(const IpAddr& ip) const {
+  auto it = random_executor_rank_.find(ip);
+  DCHECK(it != random_executor_rank_.end());
   return it->second;
 }
 
-void Scheduler::AssignmentCtx::SelectBackendOnHost(
-    const IpAddr& backend_ip, TBackendDescriptor* backend) {
-  DCHECK(backend_config_.LookUpBackendIp(backend_ip, nullptr));
-  const BackendConfig::BackendList& backends_on_host =
-      backend_config_.GetBackendListForHost(backend_ip);
-  DCHECK(backends_on_host.size() > 0);
-  if (backends_on_host.size() == 1) {
-    *backend = *backends_on_host.begin();
+void Scheduler::AssignmentCtx::SelectExecutorOnHost(
+    const IpAddr& executor_ip, TBackendDescriptor* executor) {
+  DCHECK(executors_config_.LookUpBackendIp(executor_ip, nullptr));
+  const BackendConfig::BackendList& executors_on_host =
+      executors_config_.GetBackendListForHost(executor_ip);
+  DCHECK(executors_on_host.size() > 0);
+  if (executors_on_host.size() == 1) {
+    *executor = *executors_on_host.begin();
   } else {
-    BackendConfig::BackendList::const_iterator* next_backend_on_host;
-    next_backend_on_host =
-        FindOrInsert(&next_backend_per_host_, backend_ip, backends_on_host.begin());
-    DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host)
-        != backends_on_host.end());
-    *backend = **next_backend_on_host;
+    BackendConfig::BackendList::const_iterator* next_executor_on_host;
+    next_executor_on_host =
+        FindOrInsert(&next_executor_per_host_, executor_ip, executors_on_host.begin());
+    DCHECK(find(executors_on_host.begin(), executors_on_host.end(),
+        **next_executor_on_host) != executors_on_host.end());
+    *executor = **next_executor_on_host;
     // Rotate
-    ++(*next_backend_on_host);
-    if (*next_backend_on_host == backends_on_host.end()) {
-      *next_backend_on_host = backends_on_host.begin();
+    ++(*next_executor_on_host);
+    if (*next_executor_on_host == executors_on_host.end()) {
+      *next_executor_on_host = executors_on_host.begin();
     }
   }
 }
 
 void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
-    const TBackendDescriptor& backend, PlanNodeId node_id,
+    const TBackendDescriptor& executor, PlanNodeId node_id,
     const vector<TNetworkAddress>& host_list,
     const TScanRangeLocationList& scan_range_locations,
     FragmentScanRangeAssignment* assignment) {
@@ -852,12 +831,12 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
     scan_range_length = 1000;
   }
 
-  IpAddr backend_ip;
-  bool ret = backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
+  IpAddr executor_ip;
+  bool ret = executors_config_.LookUpBackendIp(executor.address.hostname, &executor_ip);
   DCHECK(ret);
-  DCHECK(!backend_ip.empty());
+  DCHECK(!executor_ip.empty());
   assignment_heap_.InsertOrUpdate(
-      backend_ip, scan_range_length, GetBackendRank(backend_ip));
+      executor_ip, scan_range_length, GetExecutorRank(executor_ip));
 
   // See if the read will be remote. This is not the case if the impalad runs on one of
   // the replica's datanodes.
@@ -869,8 +848,8 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   for (const TScanRangeLocation& location : scan_range_locations.locations) {
     const TNetworkAddress& replica_host = host_list[location.host_idx];
     IpAddr replica_ip;
-    if (backend_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
-        && backend_ip == replica_ip) {
+    if (executors_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
+        && executor_ip == replica_ip) {
       remote_read = false;
       volume_id = location.volume_id;
       is_cached = location.is_cached;
@@ -892,7 +871,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   }
 
   PerNodeScanRanges* scan_ranges =
-      FindOrInsert(assignment, backend.address, PerNodeScanRanges());
+      FindOrInsert(assignment, executor.address, PerNodeScanRanges());
   vector<TScanRangeParams>* scan_range_params_list =
       FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
   // Add scan range.
@@ -904,7 +883,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   scan_range_params_list->push_back(scan_range_params);
 
   if (VLOG_FILE_IS_ON) {
-    VLOG_FILE << "Scheduler assignment to backend: " << backend.address << "("
+    VLOG_FILE << "Scheduler assignment to executor: " << executor.address << "("
               << (remote_read ? "remote" : "local") << " selection)";
   }
 }
@@ -932,16 +911,16 @@ void Scheduler::AssignmentCtx::PrintAssignment(
 
 void Scheduler::AddressableAssignmentHeap::InsertOrUpdate(
     const IpAddr& ip, int64_t assigned_bytes, int rank) {
-  auto handle_it = backend_handles_.find(ip);
-  if (handle_it == backend_handles_.end()) {
-    AssignmentHeap::handle_type handle = backend_heap_.push({assigned_bytes, rank, ip});
-    backend_handles_.emplace(ip, handle);
+  auto handle_it = executor_handles_.find(ip);
+  if (handle_it == executor_handles_.end()) {
+    AssignmentHeap::handle_type handle = executor_heap_.push({assigned_bytes, rank, ip});
+    executor_handles_.emplace(ip, handle);
   } else {
     // We need to rebuild the heap after every update operation. Calling decrease once is
     // sufficient as both assignments decrease the key.
     AssignmentHeap::handle_type handle = handle_it->second;
     (*handle).assigned_bytes += assigned_bytes;
-    backend_heap_.decrease(handle);
+    executor_heap_.decrease(handle);
   }
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index ca520c8..7adfb5d 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -48,12 +48,12 @@ namespace test {
 class SchedulerWrapper;
 }
 
-/// Performs simple scheduling by matching between a list of backends configured
+/// Performs simple scheduling by matching between a list of executor backends configured
 /// either from the statestore, or from a static list of addresses, and a list
-/// of target data locations. The current set of backends is stored in backend_config_.
-/// When receiving changes to the backend configuration from the statestore we will make a
-/// copy of this configuration, apply the updates to the copy and atomically swap the
-/// contents of the backend_config_ pointer.
+/// of target data locations. The current set of executors is stored in executors_config_.
+/// When receiving changes to the executor configuration from the statestore we will make
+/// a copy of this configuration, apply the updates to the copy and atomically swap the
+/// contents of the executors_config_ pointer.
 ///
 /// TODO: Notice when there are duplicate statestore registrations (IMPALA-23)
 /// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query
@@ -67,9 +67,9 @@ class SchedulerWrapper;
 ///       to make it testable.
 /// TODO: Benchmark the performance of the scheduler. The tests need to include setups
 ///       with:
-///         - Small and large number of backends.
+///         - Small and large number of executors.
 ///         - Small and large query plans.
-///         - Scheduling query plans with concurrent updates to the internal backend
+///         - Scheduling query plans with concurrent updates to the internal executor
 ///           configuration.
 class Scheduler {
  public:
@@ -102,28 +102,28 @@ class Scheduler {
   Status Schedule(QuerySchedule* schedule);
 
  private:
-  /// Map from a host's IP address to the next backend to be round-robin scheduled for
-  /// that host (needed for setups with multiple backends on a single host)
+  /// Map from a host's IP address to the next executor to be round-robin scheduled for
+  /// that host (needed for setups with multiple executors on a single host)
   typedef boost::unordered_map<IpAddr, BackendConfig::BackendList::const_iterator>
-      NextBackendPerHost;
+      NextExecutorPerHost;
 
-  typedef std::shared_ptr<const BackendConfig> BackendConfigPtr;
+  typedef std::shared_ptr<const BackendConfig> ExecutorsConfigPtr;
 
-  /// Internal structure to track scan range assignments for a backend host. This struct
+  /// Internal structure to track scan range assignments for an executor host. This struct
   /// is used as the heap element in and maintained by AddressableAssignmentHeap.
-  struct BackendAssignmentInfo {
-    /// The number of bytes assigned to a backend host.
+  struct ExecutorAssignmentInfo {
+    /// The number of bytes assigned to an executor.
     int64_t assigned_bytes;
 
     /// Each host gets assigned a random rank to break ties in a random but deterministic
     /// order per plan node.
     const int random_rank;
 
-    /// IP address of the backend.
+    /// IP address of the executor.
     IpAddr ip;
 
     /// Compare two elements of this struct. The key is (assigned_bytes, random_rank).
-    bool operator>(const BackendAssignmentInfo& rhs) const {
+    bool operator>(const ExecutorAssignmentInfo& rhs) const {
       if (assigned_bytes != rhs.assigned_bytes) {
         return assigned_bytes > rhs.assigned_bytes;
       }
@@ -132,87 +132,87 @@ class Scheduler {
   };
 
   /// Heap to compute candidates for scan range assignments. Elements are of type
-  /// BackendAssignmentInfo and track assignment information for each backend. By default
-  /// boost implements a max-heap so we use std::greater<T> to obtain a min-heap. This
-  /// will make the top() element of the heap be the backend with the lowest number of
-  /// assigned bytes and the lowest random rank.
-  typedef boost::heap::binomial_heap<BackendAssignmentInfo,
-      boost::heap::compare<std::greater<BackendAssignmentInfo>>>
+  /// ExecutorAssignmentInfo and track assignment information for each executor. By
+  /// default boost implements a max-heap so we use std::greater<T> to obtain a min-heap.
+  /// This will make the top() element of the heap be the executor with the lowest number
+  /// of assigned bytes and the lowest random rank.
+  typedef boost::heap::binomial_heap<ExecutorAssignmentInfo,
+      boost::heap::compare<std::greater<ExecutorAssignmentInfo>>>
       AssignmentHeap;
 
   /// Map to look up handles to heap elements to modify heap element keys.
-  typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> BackendHandleMap;
+  typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> ExecutorHandleMap;
 
-  /// Class to store backend information in an addressable heap. In addition to
+  /// Class to store executor information in an addressable heap. In addition to
   /// AssignmentHeap it can be used to look up heap elements by their IP address and
   /// update their key. For each plan node we create a new heap, so they are not shared
   /// between concurrent invocations of the scheduler.
   class AddressableAssignmentHeap {
    public:
-    const AssignmentHeap& backend_heap() const { return backend_heap_; }
-    const BackendHandleMap& backend_handles() const { return backend_handles_; }
+    const AssignmentHeap& executor_heap() const { return executor_heap_; }
+    const ExecutorHandleMap& executor_handles() const { return executor_handles_; }
 
     void InsertOrUpdate(const IpAddr& ip, int64_t assigned_bytes, int rank);
 
     // Forward interface for boost::heap
-    decltype(auto) size() const { return backend_heap_.size(); }
-    decltype(auto) top() const { return backend_heap_.top(); }
+    decltype(auto) size() const { return executor_heap_.size(); }
+    decltype(auto) top() const { return executor_heap_.top(); }
 
     // Forward interface for boost::unordered_map
-    decltype(auto) find(const IpAddr& ip) const { return backend_handles_.find(ip); }
-    decltype(auto) end() const { return backend_handles_.end(); }
+    decltype(auto) find(const IpAddr& ip) const { return executor_handles_.find(ip); }
+    decltype(auto) end() const { return executor_handles_.end(); }
 
    private:
-    // Heap to determine next backend.
-    AssignmentHeap backend_heap_;
-    // Maps backend IPs to handles in the heap.
-    BackendHandleMap backend_handles_;
+    // Heap to determine next executor.
+    AssignmentHeap executor_heap_;
+    // Maps executor IPs to handles in the heap.
+    ExecutorHandleMap executor_handles_;
   };
 
   /// Class to store context information on assignments during scheduling. It is
-  /// initialized with a copy of the global backend information and assigns a random rank
-  /// to each backend to break ties in cases where multiple backends have been assigned
-  /// the same number or bytes. It tracks the number of assigned bytes, which backends
+  /// initialized with a copy of the global executor information and assigns a random rank
+  /// to each executor to break ties in cases where multiple executors have been assigned
+  /// the same number or bytes. It tracks the number of assigned bytes, which executors
   /// have already been used, etc. Objects of this class are created in
   /// ComputeScanRangeAssignment() and thus don't need to be thread safe.
   class AssignmentCtx {
    public:
-    AssignmentCtx(const BackendConfig& backend_config, IntCounter* total_assignments,
+    AssignmentCtx(const BackendConfig& executor_config, IntCounter* total_assignments,
         IntCounter* total_local_assignments);
 
     /// Among hosts in 'data_locations', select the one with the minimum number of
-    /// assigned bytes. If backends have been assigned equal amounts of work and
-    /// 'break_ties_by_rank' is true, then the backend rank is used to break ties.
-    /// Otherwise the first backend according to their order in 'data_locations' is
+    /// assigned bytes. If executors have been assigned equal amounts of work and
+    /// 'break_ties_by_rank' is true, then the executor rank is used to break ties.
+    /// Otherwise the first executor according to their order in 'data_locations' is
     /// selected.
-    const IpAddr* SelectLocalBackendHost(
+    const IpAddr* SelectLocalExecutor(
         const std::vector<IpAddr>& data_locations, bool break_ties_by_rank);
 
-    /// Select a backend host for a remote read. If there are unused backend hosts, then
+    /// Select an executor for a remote read. If there are unused executor hosts, then
     /// those will be preferred. Otherwise the one with the lowest number of assigned
-    /// bytes is picked. If backends have been assigned equal amounts of work, then the
-    /// backend rank is used to break ties.
-    const IpAddr* SelectRemoteBackendHost();
+    /// bytes is picked. If executors have been assigned equal amounts of work, then the
+    /// executor rank is used to break ties.
+    const IpAddr* SelectRemoteExecutor();
 
-    /// Return the next backend that has not been assigned to. This assumes that a
-    /// returned backend will also be assigned to. The caller must make sure that
-    /// HasUnusedBackends() is true.
-    const IpAddr* GetNextUnusedBackendAndIncrement();
+    /// Return the next executor that has not been assigned to. This assumes that a
+    /// returned executor will also be assigned to. The caller must make sure that
+    /// HasUnusedExecutors() is true.
+    const IpAddr* GetNextUnusedExecutorAndIncrement();
 
-    /// Pick a backend in round-robin fashion from multiple backends on a single host.
-    void SelectBackendOnHost(const IpAddr& backend_ip, TBackendDescriptor* backend);
+    /// Pick an executor in round-robin fashion from multiple executors on a single host.
+    void SelectExecutorOnHost(const IpAddr& executor_ip, TBackendDescriptor* executor);
 
     /// Build a new TScanRangeParams object and append it to the assignment list for the
-    /// tuple (backend, node_id) in 'assignment'. Also, update assignment_heap_ and
+    /// tuple (executor, node_id) in 'assignment'. Also, update assignment_heap_ and
     /// assignment_byte_counters_, increase the counters 'total_assignments_' and
     /// 'total_local_assignments_'. 'scan_range_locations' contains information about the
     /// scan range and its replica locations.
-    void RecordScanRangeAssignment(const TBackendDescriptor& backend, PlanNodeId node_id,
+    void RecordScanRangeAssignment(const TBackendDescriptor& executor, PlanNodeId node_id,
         const vector<TNetworkAddress>& host_list,
         const TScanRangeLocationList& scan_range_locations,
         FragmentScanRangeAssignment* assignment);
 
-    const BackendConfig& backend_config() const { return backend_config_; }
+    const BackendConfig& executor_config() const { return executors_config_; }
 
     /// Print the assignment and statistics to VLOG_FILE.
     void PrintAssignment(const FragmentScanRangeAssignment& assignment);
@@ -225,26 +225,26 @@ class Scheduler {
       int64_t cached_bytes = 0;
     };
 
-    /// Used to look up hostnames to IP addresses and IP addresses to backend.
-    const BackendConfig& backend_config_;
+    /// Used to look up hostnames to IP addresses and IP addresses to executors.
+    const BackendConfig& executors_config_;
 
-    // Addressable heap to select remote backends from. Elements are ordered by the number
-    // of already assigned bytes (and a random rank to break ties).
+    // Addressable heap to select remote executors from. Elements are ordered by the
+    // number of already assigned bytes (and a random rank to break ties).
     AddressableAssignmentHeap assignment_heap_;
 
-    /// Store a random rank per backend host to break ties between otherwise equivalent
+    /// Store a random rank per executor host to break ties between otherwise equivalent
     /// replicas (e.g., those having the same number of assigned bytes).
-    boost::unordered_map<IpAddr, int> random_backend_rank_;
+    boost::unordered_map<IpAddr, int> random_executor_rank_;
 
-    // Index into random_backend_order. It points to the first unused backend and is used
-    // to select unused backends and inserting them into the assignment_heap_.
-    int first_unused_backend_idx_;
+    /// Index into random_executor_order. It points to the first unused executor and is
+    /// used to select unused executors and inserting them into the assignment_heap_.
+    int first_unused_executor_idx_;
 
-    /// Store a random permutation of backend hosts to select backends from.
-    std::vector<IpAddr> random_backend_order_;
+    /// Store a random permutation of executor hosts to select executors from.
+    std::vector<IpAddr> random_executor_order_;
 
-    /// Track round robin information per backend host.
-    NextBackendPerHost next_backend_per_host_;
+    /// Track round robin information per executor host.
+    NextExecutorPerHost next_executor_per_host_;
 
     /// Track number of assigned bytes that have been read from cache, locally, or
     /// remotely.
@@ -254,39 +254,40 @@ class Scheduler {
     IntCounter* total_assignments_;
     IntCounter* total_local_assignments_;
 
-    /// Return whether there are backends that have not been assigned a scan range.
-    bool HasUnusedBackends() const;
+    /// Return whether there are executors that have not been assigned a scan range.
+    bool HasUnusedExecutors() const;
 
-    /// Return the rank of a backend.
-    int GetBackendRank(const IpAddr& ip) const;
+    /// Return the rank of an executor.
+    int GetExecutorRank(const IpAddr& ip) const;
   };
 
-  /// The scheduler's backend configuration. When receiving changes to the backend
+  /// The scheduler's executors configuration. When receiving changes to the executors
   /// configuration from the statestore we will make a copy of the stored object, apply
   /// the updates to the copy and atomically swap the contents of this pointer. Each plan
-  /// node creates a read-only copy of the scheduler's current backend_config_ to use
+  /// node creates a read-only copy of the scheduler's current executors_config_ to use
   /// during scheduling.
-  BackendConfigPtr backend_config_;
+  ExecutorsConfigPtr executors_config_;
 
   /// A backend configuration which only contains the local backend. It is used when
   /// scheduling on the coordinator.
   BackendConfig coord_only_backend_config_;
 
-  /// Protect access to backend_config_ which might otherwise be updated asynchronously
+  /// Protect access to executors_config_ which might otherwise be updated asynchronously
   /// with respect to reads.
-  mutable boost::mutex backend_config_lock_;
+  mutable boost::mutex executors_config_lock_;
 
-  /// Total number of scan ranges assigned to backends during the lifetime of the
+  /// Total number of scan ranges assigned to executors during the lifetime of the
   /// scheduler.
   int64_t num_assignments_;
 
-  /// Map from unique backend id to TBackendDescriptor. Used to track the known backends
-  /// from the statestore. It's important to track both the backend ID as well as the
-  /// TBackendDescriptor so we know what is being removed in a given update.
-  /// Locking of this map is not needed since it should only be read/modified from
-  /// within the UpdateMembership() function.
+  /// Map from unique backend ID to TBackendDescriptor. The
+  /// {backend ID, TBackendDescriptor} pairs represent the IMPALA_MEMBERSHIP_TOPIC
+  /// {key, value} pairs of known executors retrieved from the statestore. It's important
+  /// to track both the backend ID as well as the TBackendDescriptor so we know what is
+  /// being removed in a given update. Locking of this map is not needed since it should
+  /// only be read/modified from within the UpdateMembership() function.
   typedef boost::unordered_map<std::string, TBackendDescriptor> BackendIdMap;
-  BackendIdMap current_membership_;
+  BackendIdMap current_executors_;
 
   /// MetricGroup subsystem access
   MetricGroup* metrics_;
@@ -314,30 +315,22 @@ class Scheduler {
   /// Initialization metric
   BooleanProperty* initialized_;
 
-  /// Current number of backends
+  /// Current number of executors
   IntGauge* num_fragment_instances_metric_;
 
   /// Used for user-to-pool resolution and looking up pool configurations. Not owned by
   /// us.
   RequestPoolService* request_pool_service_;
 
-  /// Helper methods to access backend_config_ (the shared_ptr, not its contents),
-  /// protecting the access with backend_config_lock_.
-  BackendConfigPtr GetBackendConfig() const;
-  void SetBackendConfig(const BackendConfigPtr& backend_config);
+  /// Helper methods to access executors_config_ (the shared_ptr, not its contents),
+  /// protecting the access with executors_config_lock_.
+  ExecutorsConfigPtr GetExecutorsConfig() const;
+  void SetExecutorsConfig(const ExecutorsConfigPtr& executors_config);
 
   /// Called asynchronously when an update is received from the subscription manager
   void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
 
-  /// Webserver callback that produces a list of known backends.
-  /// Example output:
-  /// "backends": [
-  ///     "henry-metrics-pkg-cdh5.ent.cloudera.com:22000"
-  ///              ],
-  void BackendsUrlCallback(
-      const Webserver::ArgumentMap& args, rapidjson::Document* document);
-
   /// Determine the pool for a user and query options via request_pool_service_.
   Status GetRequestPool(const std::string& user, const TQueryOptions& query_options,
       std::string* pool) const;
@@ -356,18 +349,19 @@ class Scheduler {
   /// Otherwise the assignment is computed for each scan range as follows:
   ///
   /// Scan ranges refer to data, which is usually replicated on multiple hosts. All scan
-  /// ranges where one of the replica hosts also runs an impala backend are processed
-  /// first. If more than one of the replicas run an impala backend, then the 'memory
-  /// distance' of each backend is considered. The concept of memory distance reflects the
-  /// cost of moving data into the processing backend's main memory. Reading from cached
-  /// replicas is generally considered less costly than reading from a local disk, which
-  /// in turn is cheaper than reading data from a remote node. If multiple backends of the
-  /// same memory distance are found, then the one with the least amount of previously
-  /// assigned work is picked, thus aiming to distribute the work as evenly as possible.
+  /// ranges where one of the replica hosts also runs an impala executor are processed
+  /// first. If more than one of the replicas run an impala executor, then the 'memory
+  /// distance' of each executor is considered. The concept of memory distance reflects
+  /// the cost of moving data into the processing executor's main memory. Reading from
+  /// cached replicas is generally considered less costly than reading from a local disk,
+  /// which in turn is cheaper than reading data from a remote node. If multiple executors
+  /// of the same memory distance are found, then the one with the least amount of
+  /// previously assigned work is picked, thus aiming to distribute the work as evenly as
+  /// possible.
   ///
-  /// Finally, scan ranges are considered which do not have an impalad backend running on
+  /// Finally, scan ranges are considered which do not have an impalad executor running on
   /// any of their data nodes. They will be load-balanced by assigned bytes across all
-  /// backends
+  /// executors.
   ///
   /// The resulting assignment is influenced by the following query options:
   ///
@@ -384,9 +378,9 @@ class Scheduler {
   ///   false.
   ///
   /// schedule_random_replica:
-  ///   When equivalent backends with a memory distance of DISK_LOCAL are found for a scan
-  ///   range (same memory distance, same amount of assigned work), then the first one
-  ///   will be picked deterministically. This aims to make better use of OS buffer
+  ///   When equivalent executors with a memory distance of DISK_LOCAL are found for a
+  ///   scan range (same memory distance, same amount of assigned work), then the first
+  ///   one will be picked deterministically. This aims to make better use of OS buffer
   ///   caches, but can lead to performance bottlenecks on individual hosts. Setting this
   ///   option to true will randomly change the order in which equivalent replicas are
   ///   picked for different plan nodes. This helps to compute a more even assignment,
@@ -396,17 +390,17 @@ class Scheduler {
   ///
   /// The method takes the following parameters:
   ///
-  /// backend_config:          Backend configuration to use for scheduling.
+  /// executor_config:         Executor configuration to use for scheduling.
   /// node_id:                 ID of the plan node.
   /// node_replica_preference: Query hint equivalent to replica_preference.
   /// node_random_replica:     Query hint equivalent to schedule_random_replica.
-  /// locations:               List of scan ranges to be assigned to backends.
+  /// locations:               List of scan ranges to be assigned to executors.
   /// host_list:               List of hosts, into which 'locations' will index.
   /// exec_at_coord:           Whether to schedule all scan ranges on the coordinator.
   /// query_options:           Query options for the current query.
   /// timer:                   Tracks execution time of ComputeScanRangeAssignment.
   /// assignment:              Output parameter, to which new assignments will be added.
-  Status ComputeScanRangeAssignment(const BackendConfig& backend_config,
+  Status ComputeScanRangeAssignment(const BackendConfig& executor_config,
       PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
       bool node_random_replica, const std::vector<TScanRangeLocationList>& locations,
       const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index ff3c7bc..b9f3382 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -76,6 +76,9 @@ static Status ParseIdFromArguments(const Webserver::ArgumentMap& args, TUniqueId
 void ImpalaHttpHandler::RegisterHandlers(Webserver* webserver) {
   DCHECK(webserver != NULL);
 
+  webserver->RegisterUrlCallback("/backends", "backends.tmpl",
+      MakeCallback(this, &ImpalaHttpHandler::BackendsHandler));
+
   webserver->RegisterUrlCallback("/hadoop-varz", "hadoop-varz.tmpl",
       MakeCallback(this, &ImpalaHttpHandler::HadoopVarzHandler));
 
@@ -777,3 +780,19 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include
   Value json_id(PrintId(query_id).c_str(), document->GetAllocator());
   document->AddMember("query_id", json_id, document->GetAllocator());
 }
+
+void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args,
+    Document* document) {
+  Value backends_list(kArrayType);
+  for (const auto& entry : server_->GetKnownBackends()) {
+    TBackendDescriptor backend = entry.second;
+    Value backend_obj(kObjectType);
+    Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator());
+    backend_obj.AddMember("address", str, document->GetAllocator());
+    backend_obj.AddMember("is_coordinator", backend.is_coordinator,
+        document->GetAllocator());
+    backend_obj.AddMember("is_executor", backend.is_executor, document->GetAllocator());
+    backends_list.PushBack(backend_obj, document->GetAllocator());
+  }
+  document->AddMember("backends", backends_list, document->GetAllocator());
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-http-handler.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h
index 7be4729..485f6db 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -144,6 +144,16 @@ class ImpalaHttpHandler {
   /// QueryStateHandler().
   void QueryStateToJson(const ImpalaServer::QueryStateRecord& record,
       rapidjson::Value* value, rapidjson::Document* document);
+
+  /// Json callback for /backends, which prints a table of known backends.
+  /// "backends" : [
+  /// {
+  ///   "address": "localhost:21000",
+  ///   "is_coordinator": true,
+  ///   "is_executor": false
+  ///   }
+  /// ]
+  void BackendsHandler(const Webserver::ArgumentMap& args, rapidjson::Document* document);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 80c1507..86cb0c9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -86,6 +86,7 @@
 using boost::adopt_lock_t;
 using boost::algorithm::is_any_of;
 using boost::algorithm::istarts_with;
+using boost::algorithm::join;
 using boost::algorithm::replace_all_copy;
 using boost::algorithm::split;
 using boost::algorithm::token_compress_on;
@@ -96,6 +97,7 @@ using boost::uuids::uuid;
 using namespace apache::thrift;
 using namespace boost::posix_time;
 using namespace beeswax;
+using namespace rapidjson;
 using namespace strings;
 
 DECLARE_int32(be_port);
@@ -178,8 +180,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
     " the maximum allowable timeout.");
 
 DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate "
-    "queries from clients. If false, this daemon will only execute query fragments, and "
-    "will refuse client connections.");
+    "queries from clients. If false, it will refuse client connections.");
+DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
+    "fragments.");
 
 // TODO: Remove for Impala 3.0.
 DEFINE_string(local_nodemanager_url, "", "Deprecated");
@@ -370,6 +373,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       bind<void>(&ImpalaServer::ExpireQueries, this)));
 
   is_coordinator_ = FLAGS_is_coordinator;
+  is_executor_ = FLAGS_is_executor;
   exec_env_->SetImpalaServer(this);
 }
 
@@ -405,6 +409,11 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) {
 }
 
 bool ImpalaServer::IsCoordinator() { return is_coordinator_; }
+bool ImpalaServer::IsExecutor() { return is_executor_; }
+
+const ImpalaServer::BackendDescriptorMap& ImpalaServer::GetKnownBackends() {
+  return known_backends_;
+}
 
 bool ImpalaServer::IsLineageLoggingEnabled() {
   return !FLAGS_lineage_event_log_dir.empty();
@@ -1585,6 +1594,8 @@ void ImpalaServer::AddLocalBackendToStatestore(
   if (known_backends_.find(local_backend_id) != known_backends_.end()) return;
 
   TBackendDescriptor local_backend_descriptor;
+  local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
+  local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
   local_backend_descriptor.__set_address(
       MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
   IpAddr ip;
@@ -1889,6 +1900,9 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
   DCHECK((beeswax_port == 0) == (beeswax_server == nullptr));
   DCHECK((hs2_port == 0) == (hs2_server == nullptr));
   DCHECK((be_port == 0) == (be_server == nullptr));
+  if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
+    return Status("Impala server needs to have a role (EXECUTOR, COORDINATOR)");
+  }
 
   impala_server->reset(new ImpalaServer(exec_env));
 
@@ -1910,8 +1924,10 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
 
     LOG(INFO) << "ImpalaInternalService listening on " << be_port;
   }
+
   if (!FLAGS_is_coordinator) {
-    LOG(INFO) << "Started worker Impala server on "
+
+    LOG(INFO) << "Started executor Impala server on "
               << ExecEnv::GetInstance()->backend_address();
     return Status::OK();
   }
@@ -1961,7 +1977,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
     LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port;
   }
 
-  LOG(INFO) << "Started coordinator Impala server on "
+  LOG(INFO) << "Started coordinator/executor Impala server on "
             << ExecEnv::GetInstance()->backend_address();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 45e8080..34e204d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -273,6 +273,12 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   /// Retuns true if this is a coordinator, false otherwise.
   bool IsCoordinator();
 
+  /// Returns true if this is an executor, false otherwise.
+  bool IsExecutor();
+
+  typedef boost::unordered_map<std::string, TBackendDescriptor> BackendDescriptorMap;
+  const BackendDescriptorMap& GetKnownBackends();
+
   /// The prefix of audit event log filename.
   static const string AUDIT_EVENT_LOG_FILE_PREFIX;
 
@@ -853,16 +859,17 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
       QueryLocations;
   QueryLocations query_locations_;
 
-  /// A map from unique backend ID to the corresponding TBackendDescriptor of that backend.
-  /// Used to track membership updates from the statestore so queries can be cancelled
-  /// when a backend is removed. It's not enough to just cancel fragments that are running
-  /// based on the deletions mentioned in the most recent statestore heartbeat; sometimes
-  /// cancellations are skipped and the statestore, at its discretion, may send only
-  /// a delta of the current membership so we need to compute any deletions.
+  /// A map from unique backend ID to the corresponding TBackendDescriptor of that
+  /// backend. Used to track membership updates from the statestore so queries can be
+  /// cancelled when a backend is removed. It's not enough to just cancel fragments that
+  /// are running based on the deletions mentioned in the most recent statestore
+  /// heartbeat; sometimes cancellations are skipped and the statestore, at its
+  /// discretion, may send only a delta of the current membership so we need to compute
+  /// any deletions.
   /// TODO: Currently there are multiple locations where cluster membership is tracked,
-  /// here and in the scheduler. This should be consolidated so there is a single component
-  /// (the scheduler?) that tracks this information and calls other interested components.
-  typedef boost::unordered_map<std::string, TBackendDescriptor> BackendDescriptorMap;
+  /// here and in the scheduler. This should be consolidated so there is a single
+  /// component (the scheduler?) that tracks this information and calls other interested
+  /// components.
   BackendDescriptorMap known_backends_;
 
   /// Generate unique session id for HiveServer2 session
@@ -956,6 +963,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   /// True if this ImpalaServer can accept client connections and coordinate
   /// queries.
   bool is_coordinator_;
+
+  /// True if this ImpalaServer can execute query fragments.
+  bool is_executor_;
 };
 
 /// Create an ImpalaServer and Thrift servers.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index 0a722cc..4382e99 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -154,6 +154,8 @@ TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr&
   TBackendDescriptor be_desc;
   be_desc.address = MakeNetworkAddress(hostname, port);
   be_desc.ip_address = ip;
+  be_desc.is_coordinator = true;
+  be_desc.is_executor = true;
   return be_desc;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/util/webserver.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 3b7c4f9..3e587a2 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -56,6 +56,7 @@ typedef sig_t sighandler_t;
 #endif
 
 using boost::algorithm::is_any_of;
+using boost::algorithm::join;
 using boost::algorithm::split;
 using boost::algorithm::trim_right;
 using boost::algorithm::to_lower;
@@ -193,10 +194,11 @@ void Webserver::RootHandler(const ArgumentMap& args, Document* document) {
 
   ExecEnv* env = ExecEnv::GetInstance();
   if (env == nullptr || env->impala_server() == nullptr) return;
-  string mode = (env->impala_server()->IsCoordinator()) ?
-      "Coordinator + Executor" : "Executor";
-  Value impala_server_mode(mode.c_str(), document->GetAllocator());
-  document->AddMember("impala_server_mode", impala_server_mode, document->GetAllocator());
+  document->AddMember("impala_server_mode", true, document->GetAllocator());
+  document->AddMember("is_coordinator", env->impala_server()->IsCoordinator(),
+      document->GetAllocator());
+  document->AddMember("is_executor", env->impala_server()->IsExecutor(),
+      document->GetAllocator());
 }
 
 void Webserver::ErrorHandler(const ArgumentMap& args, Document* document) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 37671e3..2b42959 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -38,6 +38,10 @@ parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", defau
                   help="Size of the cluster (number of impalad instances to start).")
 parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators",
                   default=3, help="Number of coordinators.")
+parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinators",
+                  action="store_true", default=False, help="If true, coordinators only "
+                  "coordinate queries and execute coordinator fragments. If false, "
+                  "coordinators also act as executors.")
 parser.add_option("--build_type", dest="build_type", default= 'latest',
                   help="Build type to use - debug / release / latest")
 parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
@@ -50,7 +54,7 @@ parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
                   type="string", default=[],
                   help="Additional arguments to pass to the Catalog Service at startup")
 parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
-                  default=False, help="Instead of starting the cluster, just kill all"\
+                  default=False, help="Instead of starting the cluster, just kill all"
                   " the running impalads and the statestored.")
 parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False,
                   help="Force kill impalad and statestore processes.")
@@ -67,7 +71,7 @@ parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
                   help="Prints all output to stderr/stdout.")
 parser.add_option("--wait_for_cluster", dest="wait_for_cluster", action="store_true",
-                  default=False, help="Wait until the cluster is ready to accept "\
+                  default=False, help="Wait until the cluster is ready to accept "
                   "queries before returning.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
@@ -206,7 +210,10 @@ def build_jvm_args(instance_num):
   BASE_JVM_DEBUG_PORT = 30000
   return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
 
-def start_impalad_instances(cluster_size, num_coordinators):
+def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
+  """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
+    act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
+    will only execute coordinator fragments."""
   if cluster_size == 0:
     # No impalad instances should be started.
     return
@@ -244,6 +251,9 @@ def start_impalad_instances(cluster_size, num_coordinators):
 
     if i >= num_coordinators:
       args = "-is_coordinator=false %s" % (args)
+    elif use_exclusive_coordinators:
+      # Coordinator instance that doesn't execute non-coordinator fragments
+      args = "-is_executor=false %s" % (args)
 
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
@@ -285,9 +295,9 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   # impalad processes may take a while to come up.
   wait_for_impala_process_count(impala_cluster)
   for impalad in impala_cluster.impalads:
+    impalad.service.wait_for_num_known_live_backends(options.cluster_size,
+        timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
     if impalad._get_arg_value('is_coordinator', default='true') == 'true':
-      impalad.service.wait_for_num_known_live_backends(options.cluster_size,
-          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
       wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
 
 def wait_for_catalog(impalad, timeout_in_seconds):
@@ -337,6 +347,10 @@ if __name__ == "__main__":
     print 'Please specify a valid number of coordinators > 0'
     sys.exit(1)
 
+  if options.use_exclusive_coordinators and options.num_coordinators >= options.cluster_size:
+    print 'Cannot start an Impala cluster with no executors'
+    sys.exit(1)
+
   if not os.path.isdir(options.log_dir):
     print 'Log dir does not exist or is not a directory: %s' % options.log_dir
     sys.exit(1)
@@ -383,7 +397,8 @@ if __name__ == "__main__":
       if not options.restart_impalad_only:
         start_statestore()
         start_catalogd()
-      start_impalad_instances(options.cluster_size, options.num_coordinators)
+      start_impalad_instances(options.cluster_size, options.num_coordinators,
+          options.use_exclusive_coordinators)
       # Sleep briefly to reduce log spam: the cluster takes some time to start up.
       sleep(3)
       wait_for_cluster()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 1677635..90400b2 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -47,7 +47,8 @@ struct TPoolStats {
 // Structure serialised in the Impala backend topic. Each Impalad
 // constructs one TBackendDescriptor, and registers it in the backend
 // topic. Impalads subscribe to this topic to learn of the location of
-// all other Impalads in the cluster.
+// all other Impalads in the cluster. Impalads can act as coordinators, executors or
+// both.
 struct TBackendDescriptor {
   // Network address of the Impala service on this backend
   1: required Types.TNetworkAddress address;
@@ -56,11 +57,17 @@ struct TBackendDescriptor {
   // cost of resolution at every Impalad (since IP addresses are needed for scheduling)
   2: required string ip_address;
 
+  // True if this is a coordinator node
+  3: required bool is_coordinator;
+
+  // True if this is an executor node
+  4: required bool is_executor;
+
   // The address of the debug HTTP server
-  3: optional Types.TNetworkAddress debug_http_address;
+  5: optional Types.TNetworkAddress debug_http_address;
 
   // True if the debug webserver is secured (for correctly generating links)
-  4: optional bool secure_webserver;
+  6: optional bool secure_webserver;
 }
 
 // Description of a single entry in a topic

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index b19b5d2..377b38e 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -108,13 +108,15 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @classmethod
   def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"),
-      cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, log_level=1):
+      cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS,
+      use_exclusive_coordinators=False, log_level=1):
     cls.impala_log_dir = log_dir
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
            '--cluster_size=%d' % cluster_size,
            '--num_coordinators=%d' % num_coordinators,
            '--log_dir=%s' % log_dir,
            '--log_level=%s' % log_level]
+    if use_exclusive_coordinators: cmd.append("--use_exclusive_coordinators")
     try:
       check_call(cmd + options, close_fds=True)
     finally:
@@ -125,8 +127,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       raise Exception("statestored was not found")
     statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
     for impalad in cls.cluster.impalads:
-      if impalad._get_arg_value('is_coordinator', default='true') == 'true':
-        impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
+      impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 6010404..cac4512 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -84,3 +84,37 @@ class TestCoordinators(CustomClusterTestSuite):
       assert num_tbls == 0
       client1.close()
       client2.close()
+
+  @pytest.mark.execute_serially
+  def test_single_coordinator_cluster_config(self):
+    """Test a cluster configuration with a single coordinator."""
+
+    def exec_and_verify_num_executors(expected_num_of_executors):
+      """Connects to the coordinator node, runs a query and verifies that certain
+        operators are executed on 'expected_num_of_executors' nodes."""
+      coordinator = self.cluster.impalads[0]
+      try:
+        client = coordinator.service.create_beeswax_client()
+        assert client is not None
+        query = "select count(*) from functional.alltypesagg"
+        result = self.execute_query_expect_success(client, query)
+        # Verify that SCAN and AGG are executed on the expected number of
+        # executor nodes
+        for rows in result.exec_summary:
+          if rows['operator'] == 'OO:SCAN HDFS':
+            assert rows['num_hosts'] == expected_num_of_executors
+          elif rows['operator'] == '01:AGGREGATE':
+            assert rows['num_hosts'] == expected_num_of_executors
+      finally:
+        client.close()
+
+    # Cluster config where the coordinator can execute query fragments
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+        use_exclusive_coordinators=False)
+    exec_and_verify_num_executors(3)
+    # Stop the cluster
+    self._stop_impala_cluster()
+    # Cluster config where the oordinator can only execute coordinator fragments
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+        use_exclusive_coordinators=True)
+    exec_and_verify_num_executors(2)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/www/backends.tmpl
----------------------------------------------------------------------
diff --git a/www/backends.tmpl b/www/backends.tmpl
index cef77b7..20cfa11 100644
--- a/www/backends.tmpl
+++ b/www/backends.tmpl
@@ -23,13 +23,17 @@ under the License.
 <table id="backends" class='table table-hover table-bordered'>
   <thead>
     <tr>
-      <th>Backend</th>
+      <th>Address</th>
+      <th>Coordinator</th>
+      <th>Executor</th>
     </tr>
   </thead>
   <tbody>
     {{#backends}}
     <tr>
-      <td>{{.}}</td>
+      <td>{{address}}</td>
+      <td>{{is_coordinator}}</td>
+      <td>{{is_executor}}</td>
     </tr>
     {{/backends}}
   </tbody>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/www/root.tmpl
----------------------------------------------------------------------
diff --git a/www/root.tmpl b/www/root.tmpl
index 40d448a..d9c6d2a 100644
--- a/www/root.tmpl
+++ b/www/root.tmpl
@@ -19,7 +19,8 @@ under the License.
 {{! Template for / }}
 {{>www/common-header.tmpl}}
   {{?impala_server_mode}}
-  <h2>Impala Server Mode: {{impala_server_mode}}</h2>
+  <h2>Impala Server Mode: {{?is_coordinator}}Coordinator{{/is_coordinator}} 
+    {{?is_executor}}Executor{{/is_executor}}</h2>
   {{/impala_server_mode}}
 
   <h2>Vers<span id="v">i</span>on</h2>