You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/07/24 18:01:42 UTC

[1/5] impala git commit: IMPALA-6299: Use LlvmCodeGen's internal list of white-listed CPU attributes for handcrafting IRs

Repository: impala
Updated Branches:
  refs/heads/master f7efba236 -> ac4acf1b7


IMPALA-6299: Use LlvmCodeGen's internal list of white-listed CPU attributes for handcrafting IRs

Previously, the IRBuilder of the LlvmCodeGen class used CpuInfo's
list of enabled features to determine the validity of certain
instructions to emit intrinsics. It did not consider the whitelist
which passed while initializing the LlvmCodeGen class. Now, the
IRBuilder inspects its own CPU attributes before emitting
instruction. This change also adds functionality to modify the CPU
attributes of the LlvmCodeGen class for testing.

Testing: Verified that the current tests which use and modify
CpuInfo produce expected results.

Change-Id: Ifece8949c143146d2a1b38d72d21c2d733bed90f
Reviewed-on: http://gerrit.cloudera.org:8080/10979
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/514dbb79
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/514dbb79
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/514dbb79

Branch: refs/heads/master
Commit: 514dbb79abe3476c18fc4ffbe36c733a7076dcfd
Parents: f7efba2
Author: poojanilangekar <po...@cloudera.com>
Authored: Thu Jul 12 15:09:17 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Jul 23 23:56:14 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/bloom-filter-benchmark.cc |  2 +
 be/src/codegen/llvm-codegen-test.cc         | 49 ++++++++++++++++++------
 be/src/codegen/llvm-codegen.cc              | 39 +++++++++++++------
 be/src/codegen/llvm-codegen.h               | 24 +++++++++---
 be/src/exec/filter-context.cc               |  2 +-
 be/src/util/cpu-info.cc                     |  8 ++++
 6 files changed, 95 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/514dbb79/be/src/benchmarks/bloom-filter-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index 4657a18..f216911 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -48,6 +48,8 @@ using namespace impala;
 // As in bloom-filter.h, ndv refers to the number of unique items inserted into a filter
 // and fpp is the probability of false positives.
 //
+// This benchmark must be executed only in RELEASE mode. Since it executes some codepath
+// which would not occur in Impala's execution, it crashes due to a DCHECK in DEBUG mode.
 //
 // Machine Info: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
 //

http://git-wip-us.apache.org/repos/asf/impala/blob/514dbb79/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index 9f668e0..3748ced 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -111,6 +111,31 @@ class LlvmCodeGenTest : public testing:: Test {
     const auto& hf = codegen->handcrafted_functions_;
     return find(hf.begin(), hf.end(), function) != hf.end();
   }
+
+  // Helper function to enable and disable LlvmCodeGen's cpu_attrs_.
+  static void EnableCodeGenCPUFeature(int64_t flag, bool enable) {
+    DCHECK(LlvmCodeGen::llvm_initialized_);
+    auto enable_flag_it = LlvmCodeGen::cpu_flag_mappings_.find(flag);
+    auto disable_flag_it = LlvmCodeGen::cpu_flag_mappings_.find(~flag);
+    DCHECK(enable_flag_it != LlvmCodeGen::cpu_flag_mappings_.end());
+    DCHECK(disable_flag_it != LlvmCodeGen::cpu_flag_mappings_.end());
+    const std::string& enable_feature = enable_flag_it->second;
+    const std::string& disable_feature = disable_flag_it->second;
+    if (enable) {
+      auto attr_it = LlvmCodeGen::cpu_attrs_.find(disable_feature);
+      if (attr_it != LlvmCodeGen::cpu_attrs_.end()) {
+        LlvmCodeGen::cpu_attrs_.erase(attr_it);
+      }
+      LlvmCodeGen::cpu_attrs_.insert(enable_feature);
+    } else {
+      auto attr_it = LlvmCodeGen::cpu_attrs_.find(enable_feature);
+      // Disable feature if currently enabled.
+      if (attr_it != LlvmCodeGen::cpu_attrs_.end()) {
+        LlvmCodeGen::cpu_attrs_.erase(attr_it);
+        LlvmCodeGen::cpu_attrs_.insert(disable_feature);
+      }
+    }
+  }
 };
 
 // Simple test to just make and destroy llvmcodegen objects.  LLVM
@@ -460,9 +485,12 @@ TEST_F(LlvmCodeGenTest, HashTest) {
     uint32_t result = test_fn();
 
     // Validate that the hashes are identical
-    EXPECT_EQ(result, expected_hash) << CpuInfo::IsSupported(CpuInfo::SSE4_2);
+    EXPECT_EQ(result, expected_hash) << LlvmCodeGen::IsCPUFeatureEnabled(CpuInfo::SSE4_2);
 
-    if (i == 0 && CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
+    if (i == 0 && LlvmCodeGen::IsCPUFeatureEnabled(CpuInfo::SSE4_2)) {
+      // Modify both CpuInfo and LlvmCodeGen::cpu_attrs_ to ensure that they have the
+      // same view of the underlying hardware while generating the hash.
+      EnableCodeGenCPUFeature(CpuInfo::SSE4_2, false);
       CpuInfo::EnableFeature(CpuInfo::SSE4_2, false);
       restore_sse_support = true;
       LlvmCodeGenTest::ClearHashFns(codegen.get());
@@ -473,6 +501,7 @@ TEST_F(LlvmCodeGenTest, HashTest) {
   }
 
   // Restore hardware feature for next test
+  EnableCodeGenCPUFeature(CpuInfo::SSE4_2, restore_sse_support);
   CpuInfo::EnableFeature(CpuInfo::SSE4_2, restore_sse_support);
 }
 
@@ -495,19 +524,17 @@ TEST_F(LlvmCodeGenTest, CpuAttrWhitelist) {
   // Non-existent attributes should be disabled regardless of initial states.
   // Whitelisted attributes like sse2 and lzcnt should retain their initial
   // state.
-  EXPECT_EQ(vector<string>(
-          {"-dummy1", "-dummy2", "-dummy3", "-dummy4", "+sse2", "-lzcnt"}),
+  EXPECT_EQ(std::unordered_set<string>(
+                {"-dummy1", "-dummy2", "-dummy3", "-dummy4", "+sse2", "-lzcnt"}),
       LlvmCodeGen::ApplyCpuAttrWhitelist(
-          {"+dummy1", "+dummy2", "-dummy3", "+dummy4", "+sse2", "-lzcnt"}));
+                {"+dummy1", "+dummy2", "-dummy3", "+dummy4", "+sse2", "-lzcnt"}));
 
   // IMPALA-6291: Test that all AVX512 attributes are disabled.
   vector<string> avx512_attrs;
-  EXPECT_EQ(vector<string>(
-        {"-avx512ifma", "-avx512dqavx512er", "-avx512f", "-avx512bw", "-avx512vl",
-         "-avx512cd", "-avx512vbmi", "-avx512pf"}),
-      LlvmCodeGen::ApplyCpuAttrWhitelist(
-        {"+avx512ifma", "+avx512dqavx512er", "+avx512f", "+avx512bw", "+avx512vl",
-         "+avx512cd", "+avx512vbmi", "+avx512pf"}));
+  EXPECT_EQ(std::unordered_set<string>({"-avx512ifma", "-avx512dqavx512er", "-avx512f",
+                "-avx512bw", "-avx512vl", "-avx512cd", "-avx512vbmi", "-avx512pf"}),
+      LlvmCodeGen::ApplyCpuAttrWhitelist({"+avx512ifma", "+avx512dqavx512er", "+avx512f",
+          "+avx512bw", "+avx512vl", "+avx512cd", "+avx512vbmi", "+avx512pf"}));
 }
 
 // Test that exercises the code path that deletes non-finalized methods before it

http://git-wip-us.apache.org/repos/asf/impala/blob/514dbb79/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 173dbb2..d6f89a3 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -119,10 +119,19 @@ namespace impala {
 
 bool LlvmCodeGen::llvm_initialized_ = false;
 string LlvmCodeGen::cpu_name_;
-vector<string> LlvmCodeGen::cpu_attrs_;
+std::unordered_set<string> LlvmCodeGen::cpu_attrs_;
 string LlvmCodeGen::target_features_attr_;
 CodegenCallGraph LlvmCodeGen::shared_call_graph_;
 
+const map<int64_t, std::string> LlvmCodeGen::cpu_flag_mappings_{
+    {CpuInfo::SSSE3, "+ssse3"}, {CpuInfo::SSE4_1, "+sse4.1"},
+    {CpuInfo::SSE4_2, "+sse4.2"}, {CpuInfo::POPCNT, "+popcnt"}, {CpuInfo::AVX, "+avx"},
+    {CpuInfo::AVX2, "+avx2"}, {CpuInfo::PCLMULQDQ, "+pclmul"},
+    {~(CpuInfo::SSSE3), "-ssse3"}, {~(CpuInfo::SSE4_1), "-sse4.1"},
+    {~(CpuInfo::SSE4_2), "-sse4.2"}, {~(CpuInfo::POPCNT), "-popcnt"},
+    {~(CpuInfo::AVX), "-avx"}, {~(CpuInfo::AVX2), "-avx2"},
+    {~(CpuInfo::PCLMULQDQ), "-pclmul"}};
+
 [[noreturn]] static void LlvmCodegenHandleError(
     void* user_data, const string& reason, bool gen_crash_diag) {
   LOG(FATAL) << "LLVM hit fatal error: " << reason.c_str();
@@ -238,7 +247,7 @@ Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool,
   // a machine without SSE4.2 support.
   llvm::StringRef module_ir;
   string module_name;
-  if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
+  if (IsCPUFeatureEnabled(CpuInfo::SSE4_2)) {
     module_ir = llvm::StringRef(
         reinterpret_cast<const char*>(impala_sse_llvm_ir), impala_sse_llvm_ir_len);
     module_name = "Impala IR with SSE 4.2 support";
@@ -471,17 +480,24 @@ void LlvmCodeGen::EnableOptimizations(bool enable) {
   optimizations_enabled_ = enable;
 }
 
-void LlvmCodeGen::GetHostCPUAttrs(vector<string>* attrs) {
+void LlvmCodeGen::GetHostCPUAttrs(std::unordered_set<string>* attrs) {
   // LLVM's ExecutionEngine expects features to be enabled or disabled with a list
   // of strings like ["+feature1", "-feature2"].
   llvm::StringMap<bool> cpu_features;
   llvm::sys::getHostCPUFeatures(cpu_features);
   for (const llvm::StringMapEntry<bool>& entry : cpu_features) {
-    attrs->emplace_back(
-        Substitute("$0$1", entry.second ? "+" : "-", entry.first().data()));
+    attrs->emplace(Substitute("$0$1", entry.second ? "+" : "-", entry.first().data()));
   }
 }
 
+bool LlvmCodeGen::IsCPUFeatureEnabled(int64_t flag) {
+  DCHECK(llvm_initialized_);
+  auto enable_flag_it = cpu_flag_mappings_.find(flag);
+  DCHECK(enable_flag_it != cpu_flag_mappings_.end());
+  const std::string& enabled_feature = enable_flag_it->second;
+  return cpu_attrs_.find(enabled_feature) != cpu_attrs_.end();
+}
+
 string LlvmCodeGen::GetIR(bool full_module) const {
   string str;
   llvm::raw_string_ostream stream(str);
@@ -1526,7 +1542,7 @@ void LlvmCodeGen::ClearHashFns() {
 //   ret i32 %12
 // }
 llvm::Function* LlvmCodeGen::GetHashFunction(int num_bytes) {
-  if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
+  if (IsCPUFeatureEnabled(CpuInfo::SSE4_2)) {
     if (num_bytes == -1) {
       // -1 indicates variable length, just return the generic loop based
       // hash fn.
@@ -1671,8 +1687,9 @@ llvm::Constant* LlvmCodeGen::ConstantsToGVArrayPtr(llvm::Type* element_type,
   return ConstantToGVPtr(array_type, array_const, name);
 }
 
-vector<string> LlvmCodeGen::ApplyCpuAttrWhitelist(const vector<string>& cpu_attrs) {
-  vector<string> result;
+std::unordered_set<string> LlvmCodeGen::ApplyCpuAttrWhitelist(
+    const std::unordered_set<string>& cpu_attrs) {
+  std::unordered_set<string> result;
   vector<string> attr_whitelist;
   boost::split(attr_whitelist, FLAGS_llvm_cpu_attr_whitelist, boost::is_any_of(","));
   for (const string& attr : cpu_attrs) {
@@ -1680,17 +1697,17 @@ vector<string> LlvmCodeGen::ApplyCpuAttrWhitelist(const vector<string>& cpu_attr
     DCHECK(attr[0] == '-' || attr[0] == '+') << attr;
     if (attr[0] == '-') {
       // Already disabled - copy it over unmodified.
-      result.push_back(attr);
+      result.insert(attr);
       continue;
     }
     const string attr_name = attr.substr(1);
     auto it = std::find(attr_whitelist.begin(), attr_whitelist.end(), attr_name);
     if (it != attr_whitelist.end()) {
       // In whitelist - copy it over unmodified.
-      result.push_back(attr);
+      result.insert(attr);
     } else {
       // Not in whitelist - disable it.
-      result.push_back("-" + attr_name);
+      result.insert("-" + attr_name);
     }
   }
   return result;

http://git-wip-us.apache.org/repos/asf/impala/blob/514dbb79/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 7e9da26..9ede053 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -238,7 +238,10 @@ class LlvmCodeGen {
   };
 
   /// Get host cpu attributes in format expected by EngineBuilder.
-  static void GetHostCPUAttrs(std::vector<std::string>* attrs);
+  static void GetHostCPUAttrs(std::unordered_set<std::string>* attrs);
+
+  /// Returns whether or not this cpu feature is supported.
+  static bool IsCPUFeatureEnabled(int64_t flag);
 
   /// Return a pointer type to 'type'
   llvm::PointerType* GetPtrType(llvm::Type* type);
@@ -593,6 +596,7 @@ class LlvmCodeGen {
   friend class ExprCodegenTest;
   friend class LlvmCodeGenTest;
   friend class LlvmCodeGenTest_CpuAttrWhitelist_Test;
+  friend class LlvmCodeGenTest_HashTest_Test;
   friend class SubExprElimination;
 
   /// Top level codegen object. 'module_id' is used for debugging when outputting the IR.
@@ -714,21 +718,29 @@ class LlvmCodeGen {
   /// always present in the output, except "+" is flipped to "-" for the disabled
   /// attributes. E.g. if 'cpu_attrs' is {"+x", "+y", "-z"} and the whitelist is
   /// {"x", "z"}, returns {"+x", "-y", "-z"}.
-  static std::vector<std::string> ApplyCpuAttrWhitelist(
-      const std::vector<std::string>& cpu_attrs);
+  static std::unordered_set<std::string> ApplyCpuAttrWhitelist(
+      const std::unordered_set<std::string>& cpu_attrs);
 
   /// Whether InitializeLlvm() has been called.
   static bool llvm_initialized_;
 
   /// Host CPU name and attributes, filled in by InitializeLlvm().
   static std::string cpu_name_;
-  static std::vector<std::string> cpu_attrs_;
+  /// The cpu_attrs_ should not be modified during the execution except for tests.
+  static std::unordered_set<std::string> cpu_attrs_;
 
   /// Value of "target-features" attribute to be set on all IR functions. Derived from
-  /// 'cpu_attrs_'. Using a consistent value for this attribute among hand-crafted IR
-  /// and cross-compiled functions allow them to be inlined into each other.
+  /// 'cpu_attrs_'. Using a consistent value for this attribute among
+  /// hand-crafted IR and cross-compiled functions allow them to be inlined into each
+  /// other.
   static std::string target_features_attr_;
 
+  /// Mapping between CpuInfo flags and the corresponding strings.
+  /// The key is mapped to the string as follows:
+  /// CpuInfo flag -> enabled feature.
+  /// Bitwise negation of CpuInfo flag -> disabled feature.
+  const static std::map<int64_t, std::string> cpu_flag_mappings_;
+
   /// A global shared call graph for all IR functions in the main module.
   /// Used for determining dependencies when materializing IR functions.
   static CodegenCallGraph shared_call_graph_;

http://git-wip-us.apache.org/repos/asf/impala/blob/514dbb79/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 5c39ff9..08cd666 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -388,7 +388,7 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
 
     // Call Insert() on the bloom filter.
     llvm::Function* insert_bloom_filter_fn;
-    if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
+    if (LlvmCodeGen::IsCPUFeatureEnabled(CpuInfo::AVX2)) {
       insert_bloom_filter_fn =
           codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/514dbb79/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index 815e0d5..e486479 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -68,6 +68,14 @@ void WarnIfFileNotEqual(
 
 namespace impala {
 
+const int64_t CpuInfo::SSSE3;
+const int64_t CpuInfo::SSE4_1;
+const int64_t CpuInfo::SSE4_2;
+const int64_t CpuInfo::POPCNT;
+const int64_t CpuInfo::AVX;
+const int64_t CpuInfo::AVX2;
+const int64_t CpuInfo::PCLMULQDQ;
+
 bool CpuInfo::initialized_ = false;
 int64_t CpuInfo::hardware_flags_ = 0;
 int64_t CpuInfo::original_hardware_flags_;


[5/5] impala git commit: IMPALA-3040: Remove cache directives if a partition is dropped externally

Posted by bh...@apache.org.
IMPALA-3040: Remove cache directives if a partition is dropped externally

HdfsTable.dropPartition() doesn't uncache the partition right now. If
the partition is dropped from Hive and refreshed in Impala, the
partition will be removed from the catalog but the cache directive
remains. Because Impala directly uses HMS client to drop a
table/database, the cache directive won't be removed even if the table
is dropped in Impala, if the backgroud loading is run concurrenty with
the HMS client RPC call. This patch removes the cache directive in
dropPartition() if the partition is removed from HMS.

Change-Id: Id7701a499405e961456adea63f3592b43bd69170
Reviewed-on: http://gerrit.cloudera.org:8080/10792
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Tianyi Wang <tw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ac4acf1b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ac4acf1b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ac4acf1b

Branch: refs/heads/master
Commit: ac4acf1b77ccad95528741c255834d8ccdb84518
Parents: 8d7f638
Author: Tianyi Wang <ti...@apache.org>
Authored: Tue Jul 3 14:51:54 2018 -0700
Committer: Tianyi Wang <tw...@cloudera.com>
Committed: Tue Jul 24 17:59:41 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 40 +++++++++++++++-----
 .../impala/service/CatalogOpExecutor.java       |  3 --
 tests/query_test/test_hdfs_caching.py           | 19 ++++++++++
 3 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ac4acf1b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 957b1f9..b4bd707 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -59,6 +59,7 @@ import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
@@ -1150,8 +1151,10 @@ public class HdfsTable extends Table implements FeFsTable {
   /**
    * Drops a partition and updates partition column statistics. Returns the
    * HdfsPartition that was dropped or null if the partition does not exist.
+   * If removeCacheDirective = true, any cache directive on the partition is removed.
    */
-  private HdfsPartition dropPartition(HdfsPartition partition) {
+  private HdfsPartition dropPartition(HdfsPartition partition,
+      boolean removeCacheDirective) {
     if (partition == null) return null;
     fileMetadataStats_.totalFileBytes -= partition.getSize();
     fileMetadataStats_.numFiles -= partition.getNumFileDescriptors();
@@ -1160,6 +1163,14 @@ public class HdfsTable extends Table implements FeFsTable {
     Long partitionId = partition.getId();
     partitionMap_.remove(partitionId);
     nameToPartitionMap_.remove(partition.getPartitionName());
+    if (removeCacheDirective && partition.isMarkedCached()) {
+      try {
+        HdfsCachingUtil.removePartitionCacheDirective(partition);
+      } catch (ImpalaException e) {
+        LOG.error("Unable to remove the cache directive on table " + getFullName() +
+            ", partition " + partition.getPartitionName() + ": ", e);
+      }
+    }
     if (!isStoredInImpaladCatalogCache()) return partition;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
@@ -1185,20 +1196,29 @@ public class HdfsTable extends Table implements FeFsTable {
     return partition;
   }
 
+  private HdfsPartition dropPartition(HdfsPartition partition) {
+    return dropPartition(partition, true);
+  }
+
   /**
    * Drops the given partitions from this table. Cleans up its metadata from all the
    * mappings used to speed up partition pruning/lookup. Also updates partitions column
    * statistics. Returns the list of partitions that were dropped.
    */
-  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
+  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions,
+      boolean removeCacheDirective) {
     ArrayList<HdfsPartition> droppedPartitions = Lists.newArrayList();
     for (HdfsPartition partition: partitions) {
-      HdfsPartition hdfsPartition = dropPartition(partition);
+      HdfsPartition hdfsPartition = dropPartition(partition, removeCacheDirective);
       if (hdfsPartition != null) droppedPartitions.add(hdfsPartition);
     }
     return droppedPartitions;
   }
 
+  public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
+    return dropPartitions(partitions, true);
+  }
+
   /**
    * Update the prototype partition used when creating new partitions for
    * this table. New partitions will inherit storage properties from the
@@ -1360,16 +1380,15 @@ public class HdfsTable extends Table implements FeFsTable {
     Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = Maps.newHashMap();
     // Partitions that need to be dropped and recreated from scratch
     List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
-    // Partitions that need to be removed from this table. That includes dirty
-    // partitions as well as partitions that were removed from the Hive Metastore.
-    List<HdfsPartition> partitionsToRemove = Lists.newArrayList();
+    // Partitions removed from the Hive Metastore.
+    List<HdfsPartition> removedPartitions = Lists.newArrayList();
     // Identify dirty partitions that need to be loaded from the Hive Metastore and
     // partitions that no longer exist in the Hive Metastore.
     for (HdfsPartition partition: partitionMap_.values()) {
       // Remove partitions that don't exist in the Hive Metastore. These are partitions
       // that were removed from HMS using some external process, e.g. Hive.
       if (!msPartitionNames.contains(partition.getPartitionName())) {
-        partitionsToRemove.add(partition);
+        removedPartitions.add(partition);
       }
       if (partition.isDirty()) {
         // Dirty partitions are updated by removing them from table's partition
@@ -1391,8 +1410,9 @@ public class HdfsTable extends Table implements FeFsTable {
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
       partitionNames.add(partition.getPartitionName());
     }
-    partitionsToRemove.addAll(dirtyPartitions);
-    dropPartitions(partitionsToRemove);
+    dropPartitions(removedPartitions);
+    // dirtyPartitions are reloaded and hence cache directives are not dropped.
+    dropPartitions(dirtyPartitions, false);
     // Load dirty partitions from Hive Metastore
     loadPartitionsFromMetastore(dirtyPartitions, client);
 
@@ -2146,7 +2166,7 @@ public class HdfsTable extends Table implements FeFsTable {
     refreshPartitionFileMetadata(refreshedPartition);
     Preconditions.checkArgument(oldPartition == null
         || HdfsPartition.KV_COMPARATOR.compare(oldPartition, refreshedPartition) == 0);
-    dropPartition(oldPartition);
+    dropPartition(oldPartition, false);
     addPartition(refreshedPartition);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ac4acf1b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 39cc108..dc19e9f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2270,9 +2270,6 @@ public class CatalogOpExecutor {
           msClient.getHiveClient().dropPartition(tableName.getDb(), tableName.getTbl(),
               part.getPartitionValuesAsStrings(true), dropOptions);
           ++numTargetedPartitions;
-          if (part.isMarkedCached()) {
-            HdfsCachingUtil.removePartitionCacheDirective(part);
-          }
         } catch (NoSuchObjectException e) {
           if (!ifExists) {
             throw new ImpalaRuntimeException(

http://git-wip-us.apache.org/repos/asf/impala/blob/ac4acf1b/tests/query_test/test_hdfs_caching.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py
index f16a4a4..16940a0 100644
--- a/tests/query_test/test_hdfs_caching.py
+++ b/tests/query_test/test_hdfs_caching.py
@@ -274,6 +274,25 @@ class TestHdfsCachingDdl(ImpalaTestSuite):
     drop_cache_directives_for_path(
         "/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
 
+  @pytest.mark.execute_serially
+  def test_external_drop(self):
+    """IMPALA-3040: Tests that dropping a partition in Hive leads to the removal of the
+       cache directive after a refresh statement in Impala."""
+    num_entries_pre = get_num_cache_requests()
+    self.client.execute("use cachedb")
+    self.client.execute("create table test_external_drop_tbl (i int) partitioned by "
+                        "(j int) cached in 'testPool'")
+    self.client.execute("insert into test_external_drop_tbl (i,j) select 1, 2")
+    # 1 directive for the table and 1 directive for the partition.
+    assert num_entries_pre + 2 == get_num_cache_requests()
+    self.hive_client.drop_partition("cachedb", "test_external_drop_tbl", ["2"], True)
+    self.client.execute("refresh test_external_drop_tbl")
+    # The directive on the partition is removed.
+    assert num_entries_pre + 1 == get_num_cache_requests()
+    self.client.execute("drop table test_external_drop_tbl")
+    # We want to see the number of cached entities return to the original count.
+    assert num_entries_pre == get_num_cache_requests()
+
 def drop_cache_directives_for_path(path):
   """Drop the cache directive for a given path"""
   rc, stdout, stderr = exec_process("hdfs cacheadmin -removeDirectives -path %s" % path)


[4/5] impala git commit: IMPALA-7212: Removes --use_krpc flag and remove old DataStream services

Posted by bh...@apache.org.
IMPALA-7212: Removes --use_krpc flag and remove old DataStream services

This change removes the flag --use_krpc which allows users
to fall back to using Thrift based implementation of DataStream
services. This flag was originally added during development of
IMPALA-2567. It has served its purpose.

As we port more ImpalaInternalServices to use KRPC, it's becoming
increasingly burdensome to maintain parallel implementation of the
RPC handlers. Therefore, going forward, KRPC is always enabled.
This change removes the Thrift based implemenation of DataStreamServices
and also simplifies some of the tests which were skipped when KRPC
is disabled.

Testing done: core debug build.

Change-Id: Icfed200751508478a3d728a917448f2dabfc67c3
Reviewed-on: http://gerrit.cloudera.org:8080/10835
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8d7f6386
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8d7f6386
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8d7f6386

Branch: refs/heads/master
Commit: 8d7f63865405c50981647ae3198bd4d39b465bf8
Parents: f9e7d93
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Jun 26 00:56:02 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 24 02:36:50 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc               |   3 +-
 be/src/exec/data-sink.cc                    |  16 +-
 be/src/exec/exchange-node.cc                |   8 +-
 be/src/exec/exchange-node.h                 |   4 +-
 be/src/rpc/authentication.cc                |   1 -
 be/src/rpc/rpc-mgr-kerberized-test.cc       |   3 -
 be/src/rpc/thrift-server-test.cc            |   3 -
 be/src/runtime/CMakeLists.txt               |   3 -
 be/src/runtime/backend-client.h             |  15 -
 be/src/runtime/data-stream-mgr-base.h       |  60 ---
 be/src/runtime/data-stream-mgr.h            | 241 ----------
 be/src/runtime/data-stream-recvr-base.h     |  60 ---
 be/src/runtime/data-stream-recvr.cc         | 366 ---------------
 be/src/runtime/data-stream-recvr.h          | 202 --------
 be/src/runtime/data-stream-sender.cc        | 563 -----------------------
 be/src/runtime/data-stream-sender.h         | 158 -------
 be/src/runtime/data-stream-test.cc          | 224 ++-------
 be/src/runtime/exec-env.cc                  |  56 +--
 be/src/runtime/exec-env.h                   |  12 +-
 be/src/runtime/fragment-instance-state.cc   |   6 +-
 be/src/runtime/krpc-data-stream-mgr.cc      |   6 +-
 be/src/runtime/krpc-data-stream-mgr.h       |  11 +-
 be/src/runtime/krpc-data-stream-recvr.cc    |   1 -
 be/src/runtime/krpc-data-stream-recvr.h     |   4 +-
 be/src/runtime/runtime-state.cc             |   6 +-
 be/src/runtime/runtime-state.h              |   4 +-
 be/src/scheduling/scheduler.cc              |  22 +-
 be/src/service/data-stream-service.cc       |   4 +-
 be/src/service/impala-internal-service.cc   |   9 -
 be/src/service/impala-internal-service.h    |   2 -
 be/src/service/impala-server.cc             |  38 +-
 be/src/service/impala-server.h              |   4 -
 bin/run-all-tests.sh                        |  13 -
 bin/start-impala-cluster.py                 |   5 -
 common/thrift/ImpalaInternalService.thrift  |  32 --
 tests/common/custom_cluster_test_suite.py   |   3 -
 tests/common/skip.py                        |   4 -
 tests/common/test_skip.py                   |  39 --
 tests/conftest.py                           |   4 -
 tests/custom_cluster/test_krpc_mem_usage.py |   1 -
 tests/custom_cluster/test_krpc_metrics.py   |   1 -
 tests/custom_cluster/test_rpc_exception.py  |  12 -
 tests/query_test/test_codegen.py            |   1 -
 tests/webserver/test_web_pages.py           |   1 -
 44 files changed, 88 insertions(+), 2143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 6130510..c6df084 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -37,7 +37,7 @@ DEFINE_string(hostname, "", "Hostname to use for this daemon, also used as part
 
 DEFINE_int32(be_port, 22000,
     "port on which thrift based ImpalaInternalService is exported");
-DEFINE_int32_hidden(krpc_port, 27000,
+DEFINE_int32(krpc_port, 27000,
     "port on which KRPC based ImpalaInternalService is exported");
 
 // Kerberos is enabled if and only if principal is set.
@@ -258,3 +258,4 @@ REMOVED_FLAG(use_statestore);
 REMOVED_FLAG(use_kudu_kinit);
 REMOVED_FLAG(disable_admission_control);
 REMOVED_FLAG(disable_mem_pools);
+REMOVED_FLAG(use_krpc);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 2ca0019..5e0fc3c 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -31,14 +31,12 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/data-stream-sender.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
 #include "common/names.h"
 
-DECLARE_bool(use_krpc);
 DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
     "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
     "can accumulate before the row batch is sent over the wire.");
@@ -66,16 +64,10 @@ Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
 
-      if (FLAGS_use_krpc) {
-        *sink = pool->Add(new KrpcDataStreamSender(fragment_instance_ctx.sender_id,
-            row_desc, thrift_sink.stream_sink, fragment_ctx.destinations,
-            FLAGS_data_stream_sender_buffer_size, state));
-      } else {
-        // TODO: figure out good buffer size based on size of output row
-        *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, row_desc,
-            thrift_sink.stream_sink, fragment_ctx.destinations,
-            FLAGS_data_stream_sender_buffer_size, state));
-      }
+      // TODO: figure out good buffer size based on size of output row
+      *sink = pool->Add(new KrpcDataStreamSender(fragment_instance_ctx.sender_id,
+          row_desc, thrift_sink.stream_sink, fragment_ctx.destinations,
+          FLAGS_data_stream_sender_buffer_size, state));
       break;
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 8884f30..07511a7 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -20,11 +20,11 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "exprs/scalar-expr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/data-stream-recvr.h"
-#include "runtime/runtime-state.h"
-#include "runtime/row-batch.h"
 #include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 64fa9fe..bdc52c6 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -26,7 +26,7 @@
 
 namespace impala {
 
-class DataStreamRecvrBase;
+class KrpcDataStreamRecvr;
 class RowBatch;
 class ScalarExpr;
 class TupleRowComparator;
@@ -76,7 +76,7 @@ class ExchangeNode : public ExecNode {
   /// The underlying DataStreamRecvrBase instance. Ownership is shared between this
   /// exchange node instance and the DataStreamMgr used to create the receiver.
   /// stream_recvr_->Close() must be called before this instance is destroyed.
-  std::shared_ptr<DataStreamRecvrBase> stream_recvr_;
+  std::shared_ptr<KrpcDataStreamRecvr> stream_recvr_;
 
   /// our input rows are a prefix of the rows we produce
   RowDescriptor input_row_desc_;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 461f155..c22ab88 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -65,7 +65,6 @@ using namespace apache::thrift;
 using namespace boost::filesystem;   // for is_regular()
 using namespace strings;
 
-DECLARE_bool(use_krpc);
 DECLARE_string(keytab_file);
 DECLARE_string(principal);
 DECLARE_string(be_principal);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 0f1f3bb..c6b95c8 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -18,8 +18,6 @@
 #include "rpc/rpc-mgr-test-base.h"
 #include "service/fe-support.h"
 
-DECLARE_bool(use_krpc);
-
 DECLARE_string(be_principal);
 DECLARE_string(hostname);
 DECLARE_string(principal);
@@ -38,7 +36,6 @@ class RpcMgrKerberizedTest :
     public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
 
   virtual void SetUp() override {
-    FLAGS_use_krpc = true;
     FLAGS_principal = "dummy-service/host@realm";
     FLAGS_be_principal = strings::Substitute("$0@$1", kdc_principal, kdc_realm);
     ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 4f97237..af867de 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -35,8 +35,6 @@ using namespace strings;
 using namespace apache::thrift;
 using apache::thrift::transport::SSLProtocol;
 
-DECLARE_bool(use_krpc);
-
 DECLARE_string(principal);
 DECLARE_string(be_principal);
 DECLARE_string(ssl_client_ca_certificate);
@@ -104,7 +102,6 @@ class ThriftKerberizedParamsTest :
 
   virtual void SetUp() override {
     KerberosSwitch k = GetParam();
-    FLAGS_use_krpc = false;
     if (k == KERBEROS_OFF) {
       FLAGS_principal.clear();
       FLAGS_be_principal.clear();

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index e09b27c..f67b8fe 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -32,9 +32,6 @@ add_library(Runtime
   client-cache.cc
   coordinator.cc
   coordinator-backend-state.cc
-  data-stream-mgr.cc
-  data-stream-recvr.cc
-  data-stream-sender.cc
   debug-options.cc
   descriptors.cc
   dml-exec-state.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 136a71e..0e35999 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -75,21 +75,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     ImpalaInternalServiceClient::recv_CancelQueryFInstances(_return);
   }
 
-  void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    FAULT_INJECTION_SEND_RPC_EXCEPTION(1024);
-    if (transmit_csw_ != NULL) {
-      SCOPED_CONCURRENT_COUNTER(transmit_csw_);
-      ImpalaInternalServiceClient::send_TransmitData(params);
-    } else {
-      ImpalaInternalServiceClient::send_TransmitData(params);
-    }
-    *send_done = true;
-    FAULT_INJECTION_RECV_RPC_EXCEPTION(1024);
-    ImpalaInternalServiceClient::recv_TransmitData(_return);
-  }
-
   /// Callers of TransmitData() should provide their own counter to measure the data
   /// transmission time.
   void SetTransmitDataCounter(RuntimeProfile::ConcurrentTimerCounter* csw) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
deleted file mode 100644
index 6f1ec78..0000000
--- a/be/src/runtime/data-stream-mgr-base.h
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
-#define IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
-
-#include "common/status.h"
-#include "runtime/bufferpool/buffer-pool.h"
-#include "runtime/descriptors.h"  // for PlanNodeId
-#include "util/aligned-new.h"
-
-namespace impala {
-
-class DataStreamRecvrBase;
-class MemTracker;
-class RuntimeProfile;
-class RuntimeState;
-class TRowBatch;
-class TUniqueId;
-
-/// Interface for a singleton class which manages all incoming data streams at a backend
-/// node.
-/// TODO: This is a temporary pure virtual base class that defines the basic interface for
-/// 2 parallel implementations of the DataStreamMgrBase, one each for Thrift and KRPC.
-/// Remove this in favor of the KRPC implementation when possible.
-class DataStreamMgrBase : public CacheLineAligned {
- public:
-  DataStreamMgrBase() {}
-
-  virtual ~DataStreamMgrBase() { }
-
-  /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
-      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
-      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker, BufferPool::ClientHandle* client = nullptr) = 0;
-
-  /// Closes all receivers registered for fragment_instance_id immediately.
-  virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;
-
-};
-
-}
-
-#endif /* IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
deleted file mode 100644
index f37b1b1..0000000
--- a/be/src/runtime/data-stream-mgr.h
+++ /dev/null
@@ -1,241 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_MGR_H
-#define IMPALA_RUNTIME_DATA_STREAM_MGR_H
-
-#include <list>
-#include <set>
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-
-#include "common/status.h"
-#include "common/object-pool.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/descriptors.h"  // for PlanNodeId
-#include "util/metrics.h"
-#include "util/promise.h"
-#include "util/runtime-profile.h"
-#include "gen-cpp/Types_types.h"  // for TUniqueId
-
-namespace impala {
-
-class DescriptorTbl;
-class DataStreamRecvr;
-class RowBatch;
-class RuntimeState;
-class TRowBatch;
-
-/// Singleton class which manages all incoming data streams at a backend node. It
-/// provides both producer and consumer functionality for each data stream.
-/// - ImpalaBackend service threads use this to add incoming data to streams
-///   in response to TransmitData rpcs (AddData()) or to signal end-of-stream conditions
-///   (CloseSender()).
-/// - Exchange nodes extract data from an incoming stream via a DataStreamRecvr,
-///   which is created with CreateRecvr().
-//
-/// DataStreamMgr also allows asynchronous cancellation of streams via Cancel()
-/// which unblocks all DataStreamRecvr::GetBatch() calls that are made on behalf
-/// of the cancelled fragment id.
-///
-/// Exposes three metrics:
-///  'senders-blocked-on-recvr-creation' - currently blocked senders.
-///  'total-senders-blocked-on-recvr-creation' - total number of blocked senders over
-///  time.
-///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
-///  timed-out while waiting for a receiver.
-///
-/// TODO: The recv buffers used in DataStreamRecvr should count against
-/// per-query memory limits.
-class DataStreamMgr : public DataStreamMgrBase {
- public:
-  DataStreamMgr(MetricGroup* metrics);
-  virtual ~DataStreamMgr() override;
-
-  /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  /// If is_merging is true, the receiver maintains a separate queue of incoming row
-  /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
-  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
-  /// Ownership of the receiver is shared between this DataStream mgr instance and the
-  /// caller. 'client' is the BufferPool's client handle for allocating buffers.
-  /// It's owned by the parent exchange node.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
-      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
-      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
-
-  /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
-  /// if the recvr has not been cancelled. sender_id identifies the sender instance
-  /// from which the data came.
-  /// The call blocks if this ends up pushing the stream over its buffering limit;
-  /// it unblocks when the consumer removed enough data to make space for
-  /// row_batch.
-  /// TODO: enforce per-sender quotas (something like 200% of buffer_size/#senders),
-  /// so that a single sender can't flood the buffer and stall everybody else.
-  /// Returns OK if successful, error status otherwise.
-  Status AddData(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-                 const TRowBatch& thrift_batch, int sender_id);
-
-  /// Notifies the recvr associated with the fragment/node id that the specified
-  /// sender has closed.
-  /// Returns OK if successful, error status otherwise.
-  Status CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-      int sender_id);
-
-  /// Closes all receivers registered for fragment_instance_id immediately.
-  void Cancel(const TUniqueId& fragment_instance_id) override;
-
- private:
-  friend class DataStreamRecvr;
-
-  /// Owned by the metric group passed into the constructor
-  MetricGroup* metrics_;
-
-  /// Current number of senders waiting for a receiver to register
-  IntGauge* num_senders_waiting_;
-
-  /// Total number of senders that have ever waited for a receiver to register
-  IntCounter* total_senders_waited_;
-
-  /// Total number of senders that timed-out waiting for a receiver to register
-  IntCounter* num_senders_timedout_;
-
-  /// protects all fields below
-  boost::mutex lock_;
-
-  /// map from hash value of fragment instance id/node id pair to stream receivers;
-  /// Ownership of the stream revcr is shared between this instance and the caller of
-  /// CreateRecvr().
-  /// we don't want to create a map<pair<TUniqueId, PlanNodeId>, DataStreamRecvr*>,
-  /// because that requires a bunch of copying of ids for lookup
-  typedef boost::unordered_multimap<uint32_t,
-      std::shared_ptr<DataStreamRecvr>> RecvrMap;
-  RecvrMap receiver_map_;
-
-  /// (Fragment instance id, Plan node id) pair that uniquely identifies a stream.
-  typedef std::pair<impala::TUniqueId, PlanNodeId> RecvrId;
-
-  /// Less-than ordering for RecvrIds.
-  struct ComparisonOp {
-    bool operator()(const RecvrId& a, const RecvrId& b) {
-      if (a.first.hi < b.first.hi) {
-        return true;
-      } else if (a.first.hi > b.first.hi) {
-        return false;
-      } else if (a.first.lo < b.first.lo) {
-        return true;
-      } else if (a.first.lo > b.first.lo) {
-        return false;
-      }
-      return a.second < b.second;
-    }
-  };
-
-  /// Ordered set of receiver IDs so that we can easily find all receivers for a given
-  /// fragment (by starting at (fragment instance id, 0) and iterating until the fragment
-  /// instance id changes), which is required for cancellation of an entire fragment.
-  ///
-  /// There is one entry in fragment_recvr_set_ for every entry in receiver_map_.
-  typedef std::set<RecvrId, ComparisonOp> FragmentRecvrSet;
-  FragmentRecvrSet fragment_recvr_set_;
-
-  /// Return the receiver for given fragment_instance_id/node_id, or NULL if not found. If
-  /// 'acquire_lock' is false, assumes lock_ is already being held and won't try to
-  /// acquire it.
-  std::shared_ptr<DataStreamRecvr> FindRecvr(const TUniqueId& fragment_instance_id,
-      PlanNodeId node_id, bool acquire_lock = true);
-
-  /// Calls FindRecvr(), but if NULL is returned, wait for up to
-  /// FLAGS_datastream_sender_timeout_ms for the receiver to be registered.  Senders may
-  /// initialise and start sending row batches before a receiver is ready. To accommodate
-  /// this, we allow senders to establish a rendezvous between them and the receiver. When
-  /// the receiver arrives, it triggers the rendezvous, and all waiting senders can
-  /// proceed. A sender that waits for too long (120s by default) will eventually time out
-  /// and abort. The output parameter 'already_unregistered' distinguishes between the two
-  /// cases in which this method returns NULL:
-  ///
-  /// 1. *already_unregistered == true: the receiver had previously arrived and was
-  /// already closed
-  ///
-  /// 2. *already_unregistered == false: the receiver has yet to arrive when this method
-  /// returns, and the timeout has expired
-  std::shared_ptr<DataStreamRecvr> FindRecvrOrWait(
-      const TUniqueId& fragment_instance_id, PlanNodeId node_id,
-      bool* already_unregistered);
-
-  /// Remove receiver block for fragment_instance_id/node_id from the map.
-  Status DeregisterRecvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id);
-
-  inline uint32_t GetHashValue(const TUniqueId& fragment_instance_id, PlanNodeId node_id);
-
-  /// The coordination primitive used to signal the arrival of a waited-for receiver
-  typedef Promise<std::shared_ptr<DataStreamRecvr>> RendezvousPromise;
-
-  /// A reference-counted promise-wrapper used to coordinate between senders and
-  /// receivers. The ref_count field tracks the number of senders waiting for the arrival
-  /// of a particular receiver. When ref_count returns to 0, the last sender has ceased
-  /// waiting (either because of a timeout, or because the receiver arrived), and the
-  /// rendezvous can be torn down.
-  ///
-  /// Access is only thread-safe when lock_ is held.
-  struct RefCountedPromise {
-    uint32_t ref_count;
-
-    // Without a conveniently copyable smart ptr, we keep a raw pointer to the promise and
-    // are careful to delete it when ref_count becomes 0.
-    RendezvousPromise* promise;
-
-    void IncRefCount() { ++ref_count; }
-
-    uint32_t DecRefCount() {
-      if (--ref_count == 0) delete promise;
-      return ref_count;
-    }
-
-    RefCountedPromise() : ref_count(0), promise(new RendezvousPromise()) { }
-  };
-
-  /// Map from stream (which identifies a receiver) to a (count, promise) pair that gives
-  /// the number of senders waiting as well as a shared promise whose value is Set() with
-  /// a pointer to the receiver when the receiver arrives. The count is used to detect
-  /// when no receivers are waiting, to initiate clean-up after the fact.
-  ///
-  /// If pending_rendezvous_[X] exists, then receiver_map_[hash(X)] and
-  /// fragment_recvr_set_[X] may exist (and vice versa), as entries are removed from
-  /// pending_rendezvous_ some time after the rendezvous is triggered by the arrival of a
-  /// matching receiver.
-  typedef boost::unordered_map<RecvrId, RefCountedPromise> RendezvousMap;
-  RendezvousMap pending_rendezvous_;
-
-  /// Map from the time, in ms, that a stream should be evicted from closed_stream_cache
-  /// to its RecvrId. Used to evict old streams from cache efficiently. multimap in case
-  /// there are multiple streams with the same eviction time.
-  typedef std::multimap<int64_t, RecvrId> ClosedStreamMap;
-  ClosedStreamMap closed_stream_expirations_;
-
-  /// Cache of recently closed RecvrIds. Used to allow straggling senders to fail fast by
-  /// checking this cache, rather than waiting for the missed-receiver timeout to elapse
-  /// in FindRecvrOrWait().
-  boost::unordered_set<RecvrId> closed_stream_cache_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-recvr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr-base.h b/be/src/runtime/data-stream-recvr-base.h
deleted file mode 100644
index e0d06fe..0000000
--- a/be/src/runtime/data-stream-recvr-base.h
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H
-#define IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H
-
-#include "common/status.h"
-
-namespace impala {
-
-class RowBatch;
-class TupleRowComparator;
-
-/// Interface for a single receiver of a m:n data stream.
-/// DataStreamRecvrBase implementations should maintain one or more queues of row batches
-/// received by a DataStreamMgrBase implementation from one or more sender fragment
-/// instances.
-/// TODO: This is a temporary pure virtual base class that defines the basic interface for
-/// 2 parallel implementations of the DataStreamRecvrBase, one each for Thrift and KRPC.
-/// Remove this in favor of the KRPC implementation when possible.
-class DataStreamRecvrBase {
- public:
-  DataStreamRecvrBase() { }
-  virtual ~DataStreamRecvrBase() { }
-
-  /// Returns next row batch in data stream.
-  virtual Status GetBatch(RowBatch** next_batch) = 0;
-
-  virtual void Close() = 0;
-
-  /// Create a SortedRunMerger instance to merge rows from multiple senders according to
-  /// the specified row comparator.
-  virtual Status CreateMerger(const TupleRowComparator& less_than) = 0;
-
-  /// Fill output_batch with the next batch of rows.
-  virtual Status GetNext(RowBatch* output_batch, bool* eos) = 0;
-
-  /// Transfer all resources from the current batches being processed from each sender
-  /// queue to the specified batch.
-  virtual void TransferAllResources(RowBatch* transfer_batch) = 0;
-
-};
-
-}
-
-#endif /* IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
deleted file mode 100644
index c9a9ab9..0000000
--- a/be/src/runtime/data-stream-recvr.cc
+++ /dev/null
@@ -1,366 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/thread/locks.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "runtime/data-stream-recvr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/row-batch-queue.h"
-#include "runtime/sorted-run-merger.h"
-#include "util/condition-variable.h"
-#include "util/runtime-profile-counters.h"
-#include "util/periodic-counter-updater.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-// Implements a blocking queue of row batches from one or more senders. One queue
-// is maintained per sender if is_merging_ is true for the enclosing receiver, otherwise
-// rows from all senders are placed in the same queue.
-class DataStreamRecvr::SenderQueue {
- public:
-  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders);
-
-  // Return the next batch from this sender queue. Sets the returned batch in cur_batch_.
-  // A returned batch that is not filled to capacity does *not* indicate
-  // end-of-stream.
-  // The call blocks until another batch arrives or all senders close.
-  // their channels. The returned batch is owned by the sender queue. The caller
-  // must acquire data from the returned batch before the next call to GetBatch().
-  Status GetBatch(RowBatch** next_batch);
-
-  // Adds a row batch to this sender queue if this stream has not been cancelled;
-  // blocks if this will make the stream exceed its buffer limit.
-  // If the total size of the batches in this queue would exceed the allowed buffer size,
-  // the queue is considered full and the call blocks until a batch is dequeued.
-  void AddBatch(const TRowBatch& batch);
-
-  // Decrement the number of remaining senders for this queue and signal eos ("new data")
-  // if the count drops to 0. The number of senders will be 1 for a merging
-  // DataStreamRecvr.
-  void DecrementSenders();
-
-  // Set cancellation flag and signal cancellation to receiver and sender. Subsequent
-  // incoming batches will be dropped.
-  void Cancel();
-
-  // Must be called once to cleanup any queued resources.
-  void Close();
-
-  // Returns the current batch from this queue being processed by a consumer.
-  RowBatch* current_batch() const { return current_batch_.get(); }
-
- private:
-  // Receiver of which this queue is a member.
-  DataStreamRecvr* recvr_;
-
-  // protects all subsequent data.
-  mutex lock_;
-
-  // if true, the receiver fragment for this stream got cancelled
-  bool is_cancelled_;
-
-  // number of senders which haven't closed the channel yet
-  // (if it drops to 0, end-of-stream is true)
-  int num_remaining_senders_;
-
-  // signal arrival of new batch or the eos/cancelled condition
-  ConditionVariable data_arrival_cv_;
-
-  // signal removal of data by stream consumer
-  ConditionVariable data_removal__cv_;
-
-  // queue of (batch length, batch) pairs.  The SenderQueue block owns memory to
-  // these batches. They are handed off to the caller via GetBatch.
-  typedef list<pair<int, RowBatch*>> RowBatchQueue;
-  RowBatchQueue batch_queue_;
-
-  // The batch that was most recently returned via GetBatch(), i.e. the current batch
-  // from this queue being processed by a consumer. Is destroyed when the next batch
-  // is retrieved.
-  scoped_ptr<RowBatch> current_batch_;
-
-  // Set to true when the first batch has been received
-  bool received_first_batch_;
-};
-
-DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders)
-  : recvr_(parent_recvr),
-    is_cancelled_(false),
-    num_remaining_senders_(num_senders),
-    received_first_batch_(false) {
-}
-
-Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
-  unique_lock<mutex> l(lock_);
-  // wait until something shows up or we know we're done
-  while (!is_cancelled_ && batch_queue_.empty() && num_remaining_senders_ > 0) {
-    VLOG_ROW << "wait arrival fragment_instance_id="
-             << PrintId(recvr_->fragment_instance_id())
-             << " node=" << recvr_->dest_node_id();
-    // Don't count time spent waiting on the sender as active time.
-    CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
-    CANCEL_SAFE_SCOPED_TIMER(
-        received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_,
-        &is_cancelled_);
-    data_arrival_cv_.Wait(l);
-  }
-
-  // cur_batch_ must be replaced with the returned batch.
-  current_batch_.reset();
-  *next_batch = NULL;
-  if (is_cancelled_) return Status::CANCELLED;
-
-  if (batch_queue_.empty()) {
-    DCHECK_EQ(num_remaining_senders_, 0);
-    return Status::OK();
-  }
-
-  received_first_batch_ = true;
-
-  DCHECK(!batch_queue_.empty());
-  RowBatch* result = batch_queue_.front().second;
-  recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
-  VLOG_ROW << "fetched #rows=" << result->num_rows();
-  batch_queue_.pop_front();
-  data_removal__cv_.NotifyOne();
-  current_batch_.reset(result);
-  *next_batch = current_batch_.get();
-  return Status::OK();
-}
-
-void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
-  unique_lock<mutex> l(lock_);
-  if (is_cancelled_) return;
-
-  COUNTER_ADD(recvr_->bytes_received_counter_, RowBatch::GetSerializedSize(thrift_batch));
-  DCHECK_GT(num_remaining_senders_, 0);
-
-  // if there's something in the queue and this batch will push us over the
-  // buffer limit we need to wait until the batch gets drained.
-  // Note: It's important that we enqueue thrift_batch regardless of buffer limit if
-  // the queue is currently empty. In the case of a merging receiver, batches are
-  // received from a specific queue based on data order, and the pipeline will stall
-  // if the merger is waiting for data from an empty queue that cannot be filled because
-  // the limit has been reached.
-  int64_t batch_size = RowBatch::GetDeserializedSize(thrift_batch);
-  while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) {
-    CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_total_timer_, &is_cancelled_);
-    VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
-             << " #buffered=" << recvr_->num_buffered_bytes_.Load()
-             << " batch_size=" << batch_size << "\n";
-
-    // We only want one thread running the timer at any one time. Only
-    // one thread may lock the try_lock, and that 'winner' starts the
-    // scoped timer.
-    bool got_timer_lock = false;
-    {
-      try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_);
-      if (timer_lock) {
-        CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_wall_timer_, &is_cancelled_);
-        data_removal__cv_.Wait(l);
-        got_timer_lock = true;
-      } else {
-        data_removal__cv_.Wait(l);
-        got_timer_lock = false;
-      }
-    }
-    // If we had the timer lock, wake up another writer to make sure
-    // that they (if no-one else) starts the timer. The guarantee is
-    // that if no thread has the try_lock, the thread that we wake up
-    // here will obtain it and run the timer.
-    //
-    // We must have given up the try_lock by this point, otherwise the
-    // woken thread might not successfully take the lock once it has
-    // woken up. (In fact, no other thread will run in AddBatch until
-    // this thread exits because of mutual exclusion around lock_, but
-    // it's good not to rely on that fact).
-    //
-    // The timer may therefore be an underestimate by the amount of
-    // time it takes this thread to finish (and yield lock_) and the
-    // notified thread to be woken up and to acquire the try_lock. In
-    // practice, this time is small relative to the total wait time.
-    if (got_timer_lock) data_removal__cv_.NotifyOne();
-  }
-
-  if (!is_cancelled_) {
-    RowBatch* batch = NULL;
-    {
-      SCOPED_TIMER(recvr_->deserialize_row_batch_timer_);
-      // Note: if this function makes a row batch, the batch *must* be added
-      // to batch_queue_. It is not valid to create the row batch and destroy
-      // it in this thread.
-      batch = new RowBatch(recvr_->row_desc_, thrift_batch, recvr_->mem_tracker());
-    }
-    VLOG_ROW << "added #rows=" << batch->num_rows()
-             << " batch_size=" << batch_size << "\n";
-    batch_queue_.push_back(make_pair(batch_size, batch));
-    recvr_->num_buffered_bytes_.Add(batch_size);
-    data_arrival_cv_.NotifyOne();
-  }
-}
-
-void DataStreamRecvr::SenderQueue::DecrementSenders() {
-  lock_guard<mutex> l(lock_);
-  DCHECK_GT(num_remaining_senders_, 0);
-  num_remaining_senders_ = max(0, num_remaining_senders_ - 1);
-  VLOG_FILE << "decremented senders: fragment_instance_id="
-            << PrintId(recvr_->fragment_instance_id())
-            << " node_id=" << recvr_->dest_node_id()
-            << " #senders=" << num_remaining_senders_;
-  if (num_remaining_senders_ == 0) data_arrival_cv_.NotifyOne();
-}
-
-void DataStreamRecvr::SenderQueue::Cancel() {
-  {
-    lock_guard<mutex> l(lock_);
-    if (is_cancelled_) return;
-    is_cancelled_ = true;
-    VLOG_QUERY << "cancelled stream: fragment_instance_id_="
-               << PrintId(recvr_->fragment_instance_id())
-               << " node_id=" << recvr_->dest_node_id();
-  }
-  // Wake up all threads waiting to produce/consume batches.  They will all
-  // notice that the stream is cancelled and handle it.
-  data_arrival_cv_.NotifyAll();
-  data_removal__cv_.NotifyAll();
-}
-
-void DataStreamRecvr::SenderQueue::Close() {
-  lock_guard<mutex> l(lock_);
-  // Note that the queue must be cancelled first before it can be closed or we may
-  // risk running into a race which can leak row batches. Please see IMPALA-3034.
-  DCHECK(is_cancelled_);
-  // Delete any batches queued in batch_queue_
-  for (RowBatchQueue::iterator it = batch_queue_.begin();
-      it != batch_queue_.end(); ++it) {
-    delete it->second;
-  }
-  current_batch_.reset();
-}
-
-Status DataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
-  DCHECK(is_merging_);
-  vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
-  input_batch_suppliers.reserve(sender_queues_.size());
-
-  // Create the merger that will a single stream of sorted rows.
-  merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false));
-
-  for (int i = 0; i < sender_queues_.size(); ++i) {
-    input_batch_suppliers.push_back(
-        bind(mem_fn(&SenderQueue::GetBatch), sender_queues_[i], _1));
-  }
-  RETURN_IF_ERROR(merger_->Prepare(input_batch_suppliers));
-  return Status::OK();
-}
-
-void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
-  for (SenderQueue* sender_queue: sender_queues_) {
-    if (sender_queue->current_batch() != NULL) {
-      sender_queue->current_batch()->TransferResourceOwnership(transfer_batch);
-    }
-  }
-}
-
-DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit,
-    RuntimeProfile* parent_profile)
-  : mgr_(stream_mgr),
-    fragment_instance_id_(fragment_instance_id),
-    dest_node_id_(dest_node_id),
-    total_buffer_limit_(total_buffer_limit),
-    row_desc_(row_desc),
-    is_merging_(is_merging),
-    num_buffered_bytes_(0),
-    profile_(parent_profile->CreateChild("DataStreamReceiver")) {
-  // Create one queue per sender if is_merging is true.
-  int num_queues = is_merging ? num_senders : 1;
-  sender_queues_.reserve(num_queues);
-  int num_sender_per_queue = is_merging ? 1 : num_senders;
-  for (int i = 0; i < num_queues; ++i) {
-    SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this,
-        num_sender_per_queue));
-    sender_queues_.push_back(queue);
-  }
-
-  mem_tracker_.reset(new MemTracker(profile_, -1, "DataStreamRecvr", parent_tracker));
-
-  // Initialize the counters
-  bytes_received_counter_ = ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
-  bytes_received_time_series_counter_ =
-      ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_);
-  deserialize_row_batch_timer_ = ADD_TIMER(profile_, "DeserializeRowBatchTimer");
-  buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
-  buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
-  data_arrival_timer_ = profile_->inactive_timer();
-  first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
-}
-
-Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
-  DCHECK(merger_.get() != NULL);
-  return merger_->GetNext(output_batch, eos);
-}
-
-void DataStreamRecvr::AddBatch(const TRowBatch& thrift_batch, int sender_id) {
-  int use_sender_id = is_merging_ ? sender_id : 0;
-  // Add all batches to the same queue if is_merging_ is false.
-  sender_queues_[use_sender_id]->AddBatch(thrift_batch);
-}
-
-void DataStreamRecvr::RemoveSender(int sender_id) {
-  int use_sender_id = is_merging_ ? sender_id : 0;
-  sender_queues_[use_sender_id]->DecrementSenders();
-}
-
-void DataStreamRecvr::CancelStream() {
-  for (int i = 0; i < sender_queues_.size(); ++i) {
-    sender_queues_[i]->Cancel();
-  }
-}
-
-void DataStreamRecvr::Close() {
-  // Remove this receiver from the DataStreamMgr that created it.
-  const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id());
-  if (!status.ok()) {
-    LOG(WARNING) << "Error deregistering receiver: " << status.GetDetail();
-  }
-  mgr_ = NULL;
-  for (int i = 0; i < sender_queues_.size(); ++i) {
-    sender_queues_[i]->Close();
-  }
-  merger_.reset();
-  mem_tracker_->Close();
-  profile_->StopPeriodicCounters();
-}
-
-DataStreamRecvr::~DataStreamRecvr() {
-  DCHECK(mgr_ == NULL) << "Must call Close()";
-}
-
-Status DataStreamRecvr::GetBatch(RowBatch** next_batch) {
-  DCHECK(!is_merging_);
-  DCHECK_EQ(sender_queues_.size(), 1);
-  return sender_queues_[0]->GetBatch(next_batch);
-}
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
deleted file mode 100644
index 37e8f70..0000000
--- a/be/src/runtime/data-stream-recvr.h
+++ /dev/null
@@ -1,202 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_H
-#define IMPALA_RUNTIME_DATA_STREAM_RECVR_H
-
-#include "data-stream-recvr-base.h"
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "common/atomic.h"
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "gen-cpp/Types_types.h"   // for TUniqueId
-#include "runtime/descriptors.h"
-#include "util/runtime-profile.h"
-
-namespace impala {
-
-class DataStreamMgr;
-class SortedRunMerger;
-class MemTracker;
-class RowBatch;
-class TRowBatch;
-class TupleRowCompare;
-
-/// Single receiver of an m:n data stream.
-/// This is for use by the DataStreamMgr, which is the implementation of the abstract
-/// class DataStreamMgrBase that depends on Thrift.
-/// DataStreamRecvr maintains one or more queues of row batches received by a
-/// DataStreamMgr from one or more sender fragment instances.
-/// Receivers are created via DataStreamMgr::CreateRecvr().
-/// Ownership of a stream recvr is shared between the DataStreamMgr that created it and
-/// the caller of DataStreamMgr::CreateRecvr() (i.e. the exchange node)
-//
-/// The is_merging_ member determines if the recvr merges input streams from different
-/// sender fragment instances according to a specified sort order.
-/// If is_merging_ = false : Only one batch queue is maintained for row batches from all
-/// sender fragment instances. These row batches are returned one at a time via
-/// GetBatch().
-/// If is_merging_ is true : One queue is created for the batches from each distinct
-/// sender. A SortedRunMerger instance must be created via CreateMerger() prior to
-/// retrieving any rows from the receiver. Rows are retrieved from the receiver via
-/// GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to
-/// GetNext(), TransferAllResources() must be called to transfer resources from the input
-/// batches from each sender to the caller's output batch.
-/// The receiver sets deep_copy to false on the merger - resources are transferred from
-/// the input batches from each sender queue to the merger to the output batch by the
-/// merger itself as it processes each run.
-//
-/// DataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove the
-/// recvr instance from the tracking structure of its DataStreamMgr in all cases.
-class DataStreamRecvr : public DataStreamRecvrBase {
- public:
-  virtual ~DataStreamRecvr() override;
-
-  /// Returns next row batch in data stream; blocks if there aren't any.
-  /// Retains ownership of the returned batch. The caller must acquire data from the
-  /// returned batch before the next call to GetBatch(). A NULL returned batch indicated
-  /// eos. Must only be called if is_merging_ is false.
-  /// TODO: This is currently only exposed to the non-merging version of the exchange.
-  /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos).
-  Status GetBatch(RowBatch** next_batch) override;
-
-  /// Deregister from DataStreamMgr instance, which shares ownership of this instance.
-  void Close() override;
-
-  /// Create a SortedRunMerger instance to merge rows from multiple sender according to the
-  /// specified row comparator. Fetches the first batches from the individual sender
-  /// queues. The exprs used in less_than must have already been prepared and opened.
-  Status CreateMerger(const TupleRowComparator& less_than) override;
-
-  /// Fill output_batch with the next batch of rows obtained by merging the per-sender
-  /// input streams. Must only be called if is_merging_ is true.
-  Status GetNext(RowBatch* output_batch, bool* eos) override;
-
-  /// Transfer all resources from the current batches being processed from each sender
-  /// queue to the specified batch.
-  void TransferAllResources(RowBatch* transfer_batch) override;
-
-  const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
-  PlanNodeId dest_node_id() const { return dest_node_id_; }
-  const RowDescriptor& row_desc() const { return *row_desc_; }
-  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
-
- private:
-  friend class DataStreamMgr;
-  class SenderQueue;
-
-  DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, bool is_merging,
-      int64_t total_buffer_limit, RuntimeProfile* parent_profile);
-
-  /// Add a new batch of rows to the appropriate sender queue, blocking if the queue is
-  /// full. Called from DataStreamMgr.
-  void AddBatch(const TRowBatch& thrift_batch, int sender_id);
-
-  /// Indicate that a particular sender is done. Delegated to the appropriate
-  /// sender queue. Called from DataStreamMgr.
-  void RemoveSender(int sender_id);
-
-  /// Empties the sender queues and notifies all waiting consumers of cancellation.
-  void CancelStream();
-
-  /// Return true if the addition of a new batch of size 'batch_size' would exceed the
-  /// total buffer limit.
-  bool ExceedsLimit(int64_t batch_size) {
-    return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
-  }
-
-  /// DataStreamMgr instance used to create this recvr. (Not owned)
-  DataStreamMgr* mgr_;
-
-  /// Fragment and node id of the destination exchange node this receiver is used by.
-  TUniqueId fragment_instance_id_;
-  PlanNodeId dest_node_id_;
-
-  /// soft upper limit on the total amount of buffering allowed for this stream across
-  /// all sender queues. we stop acking incoming data once the amount of buffered data
-  /// exceeds this value
-  int64_t total_buffer_limit_;
-
-  /// Row schema.
-  const RowDescriptor* row_desc_;
-
-  /// True if this reciver merges incoming rows from different senders. Per-sender
-  /// row batch queues are maintained in this case.
-  bool is_merging_;
-
-  /// total number of bytes held across all sender queues.
-  AtomicInt64 num_buffered_bytes_;
-
-  /// Memtracker for batches in the sender queue(s).
-  boost::scoped_ptr<MemTracker> mem_tracker_;
-
-  /// One or more queues of row batches received from senders. If is_merging_ is true,
-  /// there is one SenderQueue for each sender. Otherwise, row batches from all senders
-  /// are placed in the same SenderQueue. The SenderQueue instances are owned by the
-  /// receiver and placed in sender_queue_pool_.
-  std::vector<SenderQueue*> sender_queues_;
-
-  /// SortedRunMerger used to merge rows from different senders.
-  boost::scoped_ptr<SortedRunMerger> merger_;
-
-  /// Pool of sender queues.
-  ObjectPool sender_queue_pool_;
-
-  /// Runtime profile storing the counters below. Child of 'parent_profile' passed into
-  /// constructor.
-  RuntimeProfile* const profile_;
-
-  /// Number of bytes received
-  RuntimeProfile::Counter* bytes_received_counter_;
-
-  /// Time series of number of bytes received, samples bytes_received_counter_
-  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
-
-  RuntimeProfile::Counter* deserialize_row_batch_timer_;
-
-  /// Time spent waiting until the first batch arrives across all queues.
-  /// TODO: Turn this into a wall-clock timer.
-  RuntimeProfile::Counter* first_batch_wait_total_timer_;
-
-  /// Total time (summed across all threads) spent waiting for the
-  /// recv buffer to be drained so that new batches can be
-  /// added. Remote plan fragments are blocked for the same amount of
-  /// time.
-  RuntimeProfile::Counter* buffer_full_total_timer_;
-
-  /// Protects access to buffer_full_wall_timer_. We only want one
-  /// thread to be running the timer at any time, and we use this
-  /// try_mutex to enforce this condition. If a thread does not get
-  /// the lock, it continues to execute, but without running the
-  /// timer.
-  boost::try_mutex buffer_wall_timer_lock_;
-
-  /// Wall time senders spend waiting for the recv buffer to have capacity.
-  RuntimeProfile::Counter* buffer_full_wall_timer_;
-
-  /// Total time spent waiting for data to arrive in the recv buffer
-  RuntimeProfile::Counter* data_arrival_timer_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
deleted file mode 100644
index ad500e9..0000000
--- a/be/src/runtime/data-stream-sender.cc
+++ /dev/null
@@ -1,563 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/data-stream-sender.h"
-
-#include <iostream>
-#include <thrift/protocol/TDebugProtocol.h>
-
-#include "common/logging.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/descriptors.h"
-#include "runtime/tuple-row.h"
-#include "runtime/row-batch.h"
-#include "runtime/raw-value.inline.h"
-#include "runtime/runtime-state.h"
-#include "runtime/client-cache.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/backend-client.h"
-#include "util/aligned-new.h"
-#include "util/condition-variable.h"
-#include "util/debug-util.h"
-#include "util/network-util.h"
-#include "util/thread-pool.h"
-#include "rpc/thrift-client.h"
-#include "rpc/thrift-util.h"
-
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
-#include "common/names.h"
-
-using namespace apache::thrift;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
-
-namespace impala {
-
-// A channel sends data asynchronously via calls to TransmitData
-// to a single destination ipaddress/node.
-// It has a fixed-capacity buffer and allows the caller either to add rows to
-// that buffer individually (AddRow()), or circumvent the buffer altogether and send
-// TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
-// at any one time (ie, sending will block if the most recent rpc hasn't finished,
-// which allows the receiver node to throttle the sender by withholding acks).
-// *Not* thread-safe.
-class DataStreamSender::Channel : public CacheLineAligned {
- public:
-  // Create channel to send data to particular ipaddress/port/query/node
-  // combination. buffer_size is specified in bytes and a soft limit on
-  // how much tuple data is getting accumulated before being sent; it only applies
-  // when data is added via AddRow() and not sent directly via SendBatch().
-  Channel(DataStreamSender* parent, const RowDescriptor* row_desc,
-      const TNetworkAddress& destination, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int buffer_size)
-    : parent_(parent),
-      buffer_size_(buffer_size),
-      row_desc_(row_desc),
-      address_(MakeNetworkAddress(destination.hostname, destination.port)),
-      fragment_instance_id_(fragment_instance_id),
-      dest_node_id_(dest_node_id),
-      num_data_bytes_sent_(0),
-      rpc_thread_("DataStreamSender", "SenderThread", 1, 1,
-          bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2), true),
-      rpc_in_flight_(false) {}
-
-  // Initialize channel.
-  // Returns OK if successful, error indication otherwise.
-  Status Init(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  // Copies a single row into this channel's output buffer and flushes buffer
-  // if it reaches capacity.
-  // Returns error status if any of the preceding rpcs failed, OK otherwise.
-  Status ALWAYS_INLINE AddRow(TupleRow* row) WARN_UNUSED_RESULT;
-
-  // Asynchronously sends a row batch.
-  // Returns the status of the most recently finished TransmitData
-  // rpc (or OK if there wasn't one that hasn't been reported yet).
-  Status SendBatch(TRowBatch* batch) WARN_UNUSED_RESULT;
-
-  // Return status of last TransmitData rpc (initiated by the most recent call
-  // to either SendBatch() or SendCurrentBatch()).
-  Status GetSendStatus() WARN_UNUSED_RESULT;
-
-  // Waits for the rpc thread pool to finish the current rpc.
-  void WaitForRpc();
-
-  // Drain and shutdown the rpc thread and free the row batch allocation.
-  void Teardown(RuntimeState* state);
-
-  // Flushes any buffered row batches and sends the EOS RPC to close the channel.
-  Status FlushAndSendEos(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
-  TRowBatch* thrift_batch() { return &thrift_batch_; }
-
- private:
-  DataStreamSender* parent_;
-  int buffer_size_;
-
-  const RowDescriptor* row_desc_;
-  TNetworkAddress address_;
-  TUniqueId fragment_instance_id_;
-  PlanNodeId dest_node_id_;
-
-  // the number of TRowBatch.data bytes sent successfully
-  int64_t num_data_bytes_sent_;
-
-  // we're accumulating rows into this batch
-  scoped_ptr<RowBatch> batch_;
-  TRowBatch thrift_batch_;
-
-  // We want to reuse the rpc thread to prevent creating a thread per rowbatch.
-  // TODO: currently we only have one batch in flight, but we should buffer more
-  // batches. This is a bit tricky since the channels share the outgoing batch
-  // pointer we need some mechanism to coordinate when the batch is all done.
-  // TODO: if the order of row batches does not matter, we can consider increasing
-  // the number of threads.
-  ThreadPool<TRowBatch*> rpc_thread_; // sender thread.
-  ConditionVariable rpc_done_cv_;   // signaled when rpc_in_flight_ is set to true.
-  mutex rpc_thread_lock_; // Lock with rpc_done_cv_ protecting rpc_in_flight_
-  bool rpc_in_flight_;  // true if the rpc_thread_ is busy sending.
-
-  Status rpc_status_;  // status of most recently finished TransmitData rpc
-  RuntimeState* runtime_state_;
-
-  // Serialize batch_ into thrift_batch_ and send via SendBatch().
-  // Returns SendBatch() status.
-  Status SendCurrentBatch();
-
-  // Synchronously call TransmitData() on a client from impalad_client_cache and
-  // update rpc_status_ based on return value (or set to error if RPC failed).
-  // Called from a thread from the rpc_thread_ pool.
-  void TransmitData(int thread_id, const TRowBatch*);
-  void TransmitDataHelper(const TRowBatch*);
-
-  // Send RPC and retry waiting for response if get RPC timeout error.
-  Status DoTransmitDataRpc(ImpalaBackendConnection* client,
-      const TTransmitDataParams& params, TTransmitDataResult* res);
-};
-
-Status DataStreamSender::Channel::Init(RuntimeState* state) {
-  RETURN_IF_ERROR(rpc_thread_.Init());
-  runtime_state_ = state;
-  // TODO: figure out how to size batch_
-  int capacity = max(1, buffer_size_ / max(row_desc_->GetRowSize(), 1));
-  batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
-  return Status::OK();
-}
-
-Status DataStreamSender::Channel::SendBatch(TRowBatch* batch) {
-  VLOG_ROW << "Channel::SendBatch() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows=" << batch->num_rows;
-  // return if the previous batch saw an error
-  RETURN_IF_ERROR(GetSendStatus());
-  {
-    unique_lock<mutex> l(rpc_thread_lock_);
-    rpc_in_flight_ = true;
-  }
-  if (!rpc_thread_.Offer(batch)) {
-    unique_lock<mutex> l(rpc_thread_lock_);
-    rpc_in_flight_ = false;
-  }
-  return Status::OK();
-}
-
-void DataStreamSender::Channel::TransmitData(int thread_id, const TRowBatch* batch) {
-  DCHECK(rpc_in_flight_);
-  TransmitDataHelper(batch);
-
-  {
-    unique_lock<mutex> l(rpc_thread_lock_);
-    rpc_in_flight_ = false;
-  }
-  rpc_done_cv_.NotifyOne();
-}
-
-void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
-  DCHECK(batch != NULL);
-  VLOG_ROW << "Channel::TransmitData() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows=" << batch->num_rows;
-  TTransmitDataParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_dest_fragment_instance_id(fragment_instance_id_);
-  params.__set_dest_node_id(dest_node_id_);
-  params.__set_row_batch(*batch);  // yet another copy
-  params.__set_eos(false);
-  params.__set_sender_id(parent_->sender_id_);
-
-  ImpalaBackendConnection client(runtime_state_->impalad_client_cache(),
-      address_, &rpc_status_);
-  if (!rpc_status_.ok()) return;
-
-  TTransmitDataResult res;
-  client->SetTransmitDataCounter(parent_->thrift_transmit_timer_);
-  rpc_status_ = DoTransmitDataRpc(&client, params, &res);
-  client->ResetTransmitDataCounter();
-  if (!rpc_status_.ok()) return;
-  COUNTER_ADD(parent_->profile_->total_time_counter(),
-      parent_->thrift_transmit_timer_->LapTime());
-
-  if (res.status.status_code != TErrorCode::OK) {
-    rpc_status_ = res.status;
-  } else {
-    num_data_bytes_sent_ += RowBatch::GetSerializedSize(*batch);
-    VLOG_ROW << "incremented #data_bytes_sent="
-             << num_data_bytes_sent_;
-  }
-}
-
-Status DataStreamSender::Channel::DoTransmitDataRpc(ImpalaBackendConnection* client,
-    const TTransmitDataParams& params, TTransmitDataResult* res) {
-  Status status = client->DoRpc(&ImpalaBackendClient::TransmitData, params, res);
-  while (status.code() == TErrorCode::RPC_RECV_TIMEOUT &&
-      !runtime_state_->is_cancelled()) {
-    status = client->RetryRpcRecv(&ImpalaBackendClient::recv_TransmitData, res);
-  }
-  return status;
-}
-
-void DataStreamSender::Channel::WaitForRpc() {
-  SCOPED_TIMER(parent_->state_->total_network_send_timer());
-  unique_lock<mutex> l(rpc_thread_lock_);
-  while (rpc_in_flight_) {
-    rpc_done_cv_.Wait(l);
-  }
-}
-
-inline Status DataStreamSender::Channel::AddRow(TupleRow* row) {
-  if (batch_->AtCapacity()) {
-    // batch_ is full, let's send it; but first wait for an ongoing
-    // transmission to finish before modifying thrift_batch_
-    RETURN_IF_ERROR(SendCurrentBatch());
-  }
-  TupleRow* dest = batch_->GetRow(batch_->AddRow());
-  const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
-  for (int i = 0; i < descs.size(); ++i) {
-    if (UNLIKELY(row->GetTuple(i) == NULL)) {
-      dest->SetTuple(i, NULL);
-    } else {
-      dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], batch_->tuple_data_pool()));
-    }
-  }
-  batch_->CommitLastRow();
-  return Status::OK();
-}
-
-Status DataStreamSender::Channel::SendCurrentBatch() {
-  // make sure there's no in-flight TransmitData() call that might still want to
-  // access thrift_batch_
-  WaitForRpc();
-  RETURN_IF_ERROR(parent_->SerializeBatch(batch_.get(), &thrift_batch_));
-  batch_->Reset();
-  RETURN_IF_ERROR(SendBatch(&thrift_batch_));
-  return Status::OK();
-}
-
-Status DataStreamSender::Channel::GetSendStatus() {
-  WaitForRpc();
-  if (!rpc_status_.ok()) {
-    LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " failed "
-               << "(fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
-               << rpc_status_.GetDetail();
-  }
-  return rpc_status_;
-}
-
-Status DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
-  VLOG_RPC << "Channel::FlushAndSendEos() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows= " << batch_->num_rows();
-
-  // We can return an error here and not go on to send the EOS RPC because the error that
-  // we returned will be sent to the coordinator who will then cancel all the remote
-  // fragments including the one that this sender is sending to.
-  if (batch_->num_rows() > 0) {
-    // flush
-    RETURN_IF_ERROR(SendCurrentBatch());
-  }
-
-  RETURN_IF_ERROR(GetSendStatus());
-
-  Status client_cnxn_status;
-  ImpalaBackendConnection client(runtime_state_->impalad_client_cache(),
-      address_, &client_cnxn_status);
-  RETURN_IF_ERROR(client_cnxn_status);
-
-  TTransmitDataParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_dest_fragment_instance_id(fragment_instance_id_);
-  params.__set_dest_node_id(dest_node_id_);
-  params.__set_sender_id(parent_->sender_id_);
-  params.__set_eos(true);
-  TTransmitDataResult res;
-
-  VLOG_RPC << "calling TransmitData(eos=true) to terminate channel.";
-  rpc_status_ = DoTransmitDataRpc(&client, params, &res);
-  if (!rpc_status_.ok()) {
-    LOG(ERROR) << "Failed to send EOS to " << TNetworkAddressToString(address_)
-               << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
-               << rpc_status_.GetDetail();
-    return rpc_status_;
-  }
-  return Status(res.status);
-}
-
-void DataStreamSender::Channel::Teardown(RuntimeState* state) {
-  // FlushAndSendEos() should have been called before calling Teardown(), which means that
-  // all the data should already be drained. Calling DrainAndShutdown() only to shutdown.
-  rpc_thread_.DrainAndShutdown();
-  batch_.reset();
-}
-
-DataStreamSender::DataStreamSender(int sender_id, const RowDescriptor* row_desc,
-    const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations,
-    int per_channel_buffer_size, RuntimeState* state)
-  : DataSink(row_desc,
-        Substitute("DataStreamSender (dst_id=$0)", sink.dest_node_id), state),
-    sender_id_(sender_id),
-    partition_type_(sink.output_partition.type),
-    current_channel_idx_(0),
-    flushed_(false),
-    current_thrift_batch_(&thrift_batch1_),
-    serialize_batch_timer_(NULL),
-    thrift_transmit_timer_(NULL),
-    bytes_sent_counter_(NULL),
-    total_sent_rows_counter_(NULL),
-    dest_node_id_(sink.dest_node_id),
-    next_unknown_partition_(0) {
-  DCHECK_GT(destinations.size(), 0);
-  DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
-      || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-      || sink.output_partition.type == TPartitionType::RANDOM
-      || sink.output_partition.type == TPartitionType::KUDU);
-  // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
-  for (int i = 0; i < destinations.size(); ++i) {
-    channels_.push_back(
-        new Channel(this, row_desc, destinations[i].thrift_backend,
-            destinations[i].fragment_instance_id, sink.dest_node_id,
-            per_channel_buffer_size));
-  }
-
-  if (partition_type_ == TPartitionType::UNPARTITIONED
-      || partition_type_ == TPartitionType::RANDOM) {
-    // Randomize the order we open/transmit to channels to avoid thundering herd problems.
-    random_shuffle(channels_.begin(), channels_.end());
-  }
-}
-
-DataStreamSender::~DataStreamSender() {
-  // TODO: check that sender was either already closed() or there was an error
-  // on some channel
-  for (int i = 0; i < channels_.size(); ++i) {
-    delete channels_[i];
-  }
-}
-
-Status DataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  DCHECK(tsink.__isset.stream_sink);
-  if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
-      partition_type_ == TPartitionType::KUDU) {
-    RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
-        *row_desc_, state, &partition_exprs_));
-  }
-  return Status::OK();
-}
-
-Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  state_ = state;
-  SCOPED_TIMER(profile_->total_time_counter());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
-      state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
-      &partition_expr_evals_));
-  bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
-  uncompressed_bytes_counter_ =
-      ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
-  serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
-  thrift_transmit_timer_ =
-      profile()->AddConcurrentTimerCounter("TransmitDataRPCTime", TUnit::TIME_NS);
-  network_throughput_ =
-      profile()->AddDerivedCounter("NetworkThroughput(*)", TUnit::BYTES_PER_SECOND,
-          bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                                       thrift_transmit_timer_));
-  overall_throughput_ =
-      profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
-           bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                         profile()->total_time_counter()));
-
-  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
-  for (int i = 0; i < channels_.size(); ++i) {
-    RETURN_IF_ERROR(channels_[i]->Init(state));
-  }
-  return Status::OK();
-}
-
-Status DataStreamSender::Open(RuntimeState* state) {
-  return ScalarExprEvaluator::Open(partition_expr_evals_, state);
-}
-
-Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
-  DCHECK(!closed_);
-  DCHECK(!flushed_);
-
-  if (batch->num_rows() == 0) return Status::OK();
-  if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) {
-    // current_thrift_batch_ is *not* the one that was written by the last call
-    // to Serialize()
-    RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size()));
-    // SendBatch() will block if there are still in-flight rpcs (and those will
-    // reference the previously written thrift batch)
-    for (int i = 0; i < channels_.size(); ++i) {
-      RETURN_IF_ERROR(channels_[i]->SendBatch(current_thrift_batch_));
-    }
-    current_thrift_batch_ =
-        (current_thrift_batch_ == &thrift_batch1_ ? &thrift_batch2_ : &thrift_batch1_);
-  } else if (partition_type_ == TPartitionType::RANDOM) {
-    // Round-robin batches among channels. Wait for the current channel to finish its
-    // rpc before overwriting its batch.
-    Channel* current_channel = channels_[current_channel_idx_];
-    current_channel->WaitForRpc();
-    RETURN_IF_ERROR(SerializeBatch(batch, current_channel->thrift_batch()));
-    RETURN_IF_ERROR(current_channel->SendBatch(current_channel->thrift_batch()));
-    current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
-  } else if (partition_type_ == TPartitionType::KUDU) {
-    DCHECK_EQ(partition_expr_evals_.size(), 1);
-    int num_channels = channels_.size();
-    const int num_rows = batch->num_rows();
-    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
-    int channel_ids[hash_batch_size];
-
-    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
-      const int batch_window_size = min(num_rows - batch_start, hash_batch_size);
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        int32_t partition =
-            *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-        if (partition < 0) {
-          // This row doesn't correspond to a partition,
-          //  e.g. it's outside the given ranges.
-          partition = next_unknown_partition_;
-          ++next_unknown_partition_;
-        }
-        channel_ids[i] = partition % num_channels;
-      }
-
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
-      }
-    }
-  } else {
-    DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
-    // hash-partition batch's rows across channels
-    // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
-    // once we have codegen here.
-    int num_channels = channels_.size();
-    const int num_partition_exprs = partition_exprs_.size();
-    const int num_rows = batch->num_rows();
-    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
-    int channel_ids[hash_batch_size];
-
-    // Break the loop into two parts break the data dependency between computing
-    // the hash and calling AddRow()
-    // To keep stack allocation small a RowBatch::HASH_BATCH is used
-    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
-      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        uint64_t hash_val = EXCHANGE_HASH_SEED;
-        for (int j = 0; j < num_partition_exprs; ++j) {
-          ScalarExprEvaluator* eval = partition_expr_evals_[j];
-          void* partition_val = eval->GetValue(row);
-          // We can't use the crc hash function here because it does not result in
-          // uncorrelated hashes with different seeds. Instead we use FastHash.
-          // TODO: fix crc hash/GetHashValue()
-          DCHECK(&(eval->root()) == partition_exprs_[j]);
-          hash_val = RawValue::GetHashValueFastHash(
-              partition_val, partition_exprs_[j]->type(), hash_val);
-        }
-        channel_ids[i] = hash_val % num_channels;
-      }
-
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
-      }
-    }
-  }
-  COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
-  expr_results_pool_->Clear();
-  RETURN_IF_ERROR(state->CheckQueryState());
-  return Status::OK();
-}
-
-Status DataStreamSender::FlushFinal(RuntimeState* state) {
-  DCHECK(!flushed_);
-  DCHECK(!closed_);
-  flushed_ = true;
-  for (int i = 0; i < channels_.size(); ++i) {
-    // If we hit an error here, we can return without closing the remaining channels as
-    // the error is propagated back to the coordinator, which in turn cancels the query,
-    // which will cause the remaining open channels to be closed.
-    RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state));
-  }
-  return Status::OK();
-}
-
-void DataStreamSender::Close(RuntimeState* state) {
-  if (closed_) return;
-  for (int i = 0; i < channels_.size(); ++i) {
-    channels_[i]->Teardown(state);
-  }
-  ScalarExprEvaluator::Close(partition_expr_evals_, state);
-  ScalarExpr::Close(partition_exprs_);
-  DataSink::Close(state);
-}
-
-Status DataStreamSender::SerializeBatch(
-    RowBatch* src, TRowBatch* dest, int num_receivers) {
-  VLOG_ROW << "serializing " << src->num_rows() << " rows";
-  {
-    SCOPED_TIMER(profile_->total_time_counter());
-    SCOPED_TIMER(serialize_batch_timer_);
-    RETURN_IF_ERROR(src->Serialize(dest));
-    int64_t bytes = RowBatch::GetSerializedSize(*dest);
-    int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
-    COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
-    COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
-  }
-  return Status::OK();
-}
-
-int64_t DataStreamSender::GetNumDataBytesSent() const {
-  // TODO: do we need synchronization here or are reads & writes to 8-byte ints
-  // atomic?
-  int64_t result = 0;
-  for (int i = 0; i < channels_.size(); ++i) {
-    result += channels_[i]->num_data_bytes_sent();
-  }
-  return result;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h
deleted file mode 100644
index 37b9417..0000000
--- a/be/src/runtime/data-stream-sender.h
+++ /dev/null
@@ -1,158 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_SENDER_H
-#define IMPALA_RUNTIME_DATA_STREAM_SENDER_H
-
-#include <vector>
-#include <string>
-
-#include "exec/data-sink.h"
-#include "common/global-types.h"
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "util/runtime-profile.h"
-#include "gen-cpp/Results_types.h" // for TRowBatch
-
-namespace impala {
-
-class RowBatch;
-class RowDescriptor;
-
-class MemTracker;
-class TDataStreamSink;
-class TNetworkAddress;
-class TPlanFragmentDestination;
-
-/// Single sender of an m:n data stream.
-/// Row batch data is routed to destinations based on the provided
-/// partitioning specification.
-/// *Not* thread-safe.
-//
-/// TODO: capture stats that describe distribution of rows/data volume
-/// across channels.
-/// TODO: create a PlanNode equivalent class for DataSink.
-class DataStreamSender : public DataSink {
- public:
-  /// Construct a sender according to the output specification (sink),
-  /// sending to the given destinations. sender_id identifies this
-  /// sender instance, and is unique within a fragment.
-  /// Per_channel_buffer_size is the buffer size allocated to each channel
-  /// and is specified in bytes.
-  /// The RowDescriptor must live until Close() is called.
-  /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
-  /// and RANDOM.
-  DataStreamSender(int sender_id, const RowDescriptor* row_desc,
-      const TDataStreamSink& tsink,
-      const std::vector<TPlanFragmentDestination>& destinations,
-      int per_channel_buffer_size, RuntimeState* state);
-
-  virtual ~DataStreamSender();
-
-  /// Must be called before other API calls, and before the codegen'd IR module is
-  /// compiled (i.e. in an ExecNode's Prepare() function).
-  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
-
-  /// Must be called before Send() or Close(), and after the codegen'd IR module is
-  /// compiled (i.e. in an ExecNode's Open() function).
-  virtual Status Open(RuntimeState* state);
-
-  /// Flush all buffered data and close all existing channels to destination hosts.
-  /// Further Send() calls are illegal after calling FlushFinal().
-  /// It is legal to call FlushFinal() only 0 or 1 times.
-  virtual Status FlushFinal(RuntimeState* state);
-
-  /// Send data in 'batch' to destination nodes according to partitioning
-  /// specification provided in c'tor.
-  /// Blocks until all rows in batch are placed in their appropriate outgoing
-  /// buffers (ie, blocks if there are still in-flight rpcs from the last
-  /// Send() call).
-  virtual Status Send(RuntimeState* state, RowBatch* batch);
-
-  /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are
-  /// illegal after calling Close().
-  virtual void Close(RuntimeState* state);
-
-  /// Serializes the src batch into the dest thrift batch. Maintains metrics.
-  /// num_receivers is the number of receivers this batch will be sent to. Only
-  /// used to maintain metrics.
-  Status SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers = 1);
-
- protected:
-  friend class DataStreamTest;
-
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state);
-
-  /// Return total number of bytes sent in TRowBatch.data. If batches are
-  /// broadcast to multiple receivers, they are counted once per receiver.
-  int64_t GetNumDataBytesSent() const;
-
- private:
-  class Channel;
-
-  /// Sender instance id, unique within a fragment.
-  int sender_id_;
-  RuntimeState* state_;
-  TPartitionType::type partition_type_; // The type of partitioning to perform.
-  int current_channel_idx_; // index of current channel to send to if random_ == true
-
-  /// If true, this sender has called FlushFinal() successfully.
-  /// Not valid to call Send() anymore.
-  bool flushed_;
-
-  /// serialized batches for broadcasting; we need two so we can write
-  /// one while the other one is still being sent
-  TRowBatch thrift_batch1_;
-  TRowBatch thrift_batch2_;
-  TRowBatch* current_thrift_batch_;  // the next one to fill in Send()
-
-  std::vector<Channel*> channels_;
-
-  /// Expressions of partition keys. It's used to compute the
-  /// per-row partition values for shuffling exchange;
-  std::vector<ScalarExpr*> partition_exprs_;
-  std::vector<ScalarExprEvaluator*> partition_expr_evals_;
-
-  RuntimeProfile::Counter* serialize_batch_timer_;
-  /// The concurrent wall time spent sending data over the network.
-  RuntimeProfile::ConcurrentTimerCounter* thrift_transmit_timer_;
-  RuntimeProfile::Counter* bytes_sent_counter_;
-  RuntimeProfile::Counter* uncompressed_bytes_counter_;
-  RuntimeProfile::Counter* total_sent_rows_counter_;
-
-  /// Throughput per time spent in TransmitData
-  RuntimeProfile::Counter* network_throughput_;
-
-  /// Throughput per total time spent in sender
-  RuntimeProfile::Counter* overall_throughput_;
-
-  /// Identifier of the destination plan node.
-  PlanNodeId dest_node_id_;
-
-  /// Used for Kudu partitioning to round-robin rows that don't correspond to a partition
-  /// or when errors are encountered.
-  int next_unknown_partition_;
-
-  /// An arbitrary hash seed used for exchanges.
-  static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37;
-};
-
-}
-
-#endif


[2/5] impala git commit: IMPALA-7291: [DOCS] Note about no codegen support for CHAR

Posted by bh...@apache.org.
IMPALA-7291: [DOCS] Note about no codegen support for CHAR

Also, cleaned up confusing examples.

Change-Id: Id89dcf44e31f1bc56d888527585b3ec90229981a
Reviewed-on: http://gerrit.cloudera.org:8080/11022
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f9e7d938
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f9e7d938
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f9e7d938

Branch: refs/heads/master
Commit: f9e7d938546b5bad607075d4e852e21558d4d67c
Parents: 514dbb7
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Mon Jul 23 15:22:42 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 24 00:35:48 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_char.xml | 223 +++++++++++++++------------------------
 1 file changed, 87 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f9e7d938/docs/topics/impala_char.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_char.xml b/docs/topics/impala_char.xml
index 9204812..5286a3c 100644
--- a/docs/topics/impala_char.xml
+++ b/docs/topics/impala_char.xml
@@ -21,7 +21,13 @@ under the License.
 <concept id="char" rev="2.0.0">
 
   <title>CHAR Data Type (<keyword keyref="impala20"/> or higher only)</title>
-  <titlealts audience="PDF"><navtitle>CHAR</navtitle></titlealts>
+
+  <titlealts audience="PDF">
+
+    <navtitle>CHAR</navtitle>
+
+  </titlealts>
+
   <prolog>
     <metadata>
       <data name="Category" value="Impala"/>
@@ -36,9 +42,9 @@ under the License.
   <conbody>
 
     <p rev="2.0.0">
-      <indexterm audience="hidden">CHAR data type</indexterm>
-      A fixed-length character type, padded with trailing spaces if necessary to achieve the specified length. If
-      values are longer than the specified length, Impala truncates any trailing characters.
+      A fixed-length character type, padded with trailing spaces if necessary to achieve the
+      specified length. If values are longer than the specified length, Impala truncates any
+      trailing characters.
     </p>
 
     <p conref="../shared/impala_common.xml#common/syntax_blurb"/>
@@ -50,7 +56,7 @@ under the License.
 <codeblock><varname>column_name</varname> CHAR(<varname>length</varname>)</codeblock>
 
     <p>
-      The maximum length you can specify is 255.
+      The maximum <varname>length</varname> you can specify is 255.
     </p>
 
     <p>
@@ -59,21 +65,54 @@ under the License.
 
     <ul>
       <li>
-        When you store a <codeph>CHAR</codeph> value shorter than the specified length in a table, queries return
-        the value padded with trailing spaces if necessary; the resulting value has the same length as specified in
-        the column definition.
+        When you store a <codeph>CHAR</codeph> value shorter than the specified length in a
+        table, queries return the value padded with trailing spaces if necessary; the resulting
+        value has the same length as specified in the column definition.
+      </li>
+
+      <li>
+        Leading spaces in <codeph>CHAR</codeph> are preserved within the data file.
+      </li>
+
+      <li>
+        If you store a <codeph>CHAR</codeph> value containing trailing spaces in a table, those
+        trailing spaces are not stored in the data file. When the value is retrieved by a query,
+        the result could have a different number of trailing spaces. That is, the value includes
+        however many spaces are needed to pad it to the specified length of the column.
       </li>
 
       <li>
-        If you store a <codeph>CHAR</codeph> value containing trailing spaces in a table, those trailing spaces are
-        not stored in the data file. When the value is retrieved by a query, the result could have a different
-        number of trailing spaces. That is, the value includes however many spaces are needed to pad it to the
-        specified length of the column.
+        If you compare two <codeph>CHAR</codeph> values that differ only in the number of
+        trailing spaces, those values are considered identical.
       </li>
 
       <li>
-        If you compare two <codeph>CHAR</codeph> values that differ only in the number of trailing spaces, those
-        values are considered identical.
+        When comparing or processing <codeph>CHAR</codeph> values:
+        <ul>
+          <li>
+            <codeph>CAST()</codeph> truncates any longer string to fit within
+            the defined length. For example:
+<codeblock>SELECT CAST('x' AS CHAR(4)) = CAST('x        ' AS CHAR(4)); -- Returns TRUE.
+</codeblock>
+          </li>
+          <li>
+            If a <codeph>CHAR</codeph> value is shorter than the specified
+            length, it is padded on the right with spaces until it matches the
+            specified length.
+          </li>
+          <li>
+            <codeph>CHAR_LENGTH()</codeph> returns the length including any
+            trailing spaces.
+          </li>
+          <li>
+            <codeph>LENGTH()</codeph> returns the length excluding trailing
+            spaces.
+          </li>
+          <li>
+            <codeph>CONCAT()</codeph> returns the length including trailing
+            spaces.
+          </li>
+        </ul>
       </li>
     </ul>
 
@@ -93,18 +132,19 @@ under the License.
       </li>
 
       <li>
-        Parquet files generated by Impala and containing this type can be freely interchanged with other components
-        such as Hive and MapReduce.
+        Parquet files generated by Impala and containing this type can be freely interchanged
+        with other components such as Hive and MapReduce.
       </li>
 
       <li>
-        Any trailing spaces, whether implicitly or explicitly specified, are not written to the Parquet data files.
+        Any trailing spaces, whether implicitly or explicitly specified, are not written to the
+        Parquet data files.
       </li>
 
       <li>
         Parquet data files might contain values that are longer than allowed by the
-        <codeph>CHAR(<varname>n</varname>)</codeph> length limit. Impala ignores any extra trailing characters when
-        it processes those values during a query.
+        <codeph>CHAR(<varname>n</varname>)</codeph> length limit. Impala ignores any extra
+        trailing characters when it processes those values during a query.
       </li>
     </ul>
 
@@ -112,14 +152,18 @@ under the License.
 
     <p>
       Text data files might contain values that are longer than allowed for a particular
-      <codeph>CHAR(<varname>n</varname>)</codeph> column. Any extra trailing characters are ignored when Impala
-      processes those values during a query. Text data files can also contain values that are shorter than the
-      defined length limit, and Impala pads them with trailing spaces up to the specified length. Any text data
-      files produced by Impala <codeph>INSERT</codeph> statements do not include any trailing blanks for
+      <codeph>CHAR(<varname>n</varname>)</codeph> column. Any extra trailing characters are
+      ignored when Impala processes those values during a query. Text data files can also
+      contain values that are shorter than the defined length limit, and Impala pads them with
+      trailing spaces up to the specified length. Any text data files produced by Impala
+      <codeph>INSERT</codeph> statements do not include any trailing blanks for
       <codeph>CHAR</codeph> columns.
     </p>
 
-    <p><b>Avro considerations:</b></p>
+    <p>
+      <b>Avro considerations:</b>
+    </p>
+
     <p conref="../shared/impala_common.xml#common/avro_2gb_strings"/>
 
     <p conref="../shared/impala_common.xml#common/compatibility_blurb"/>
@@ -129,7 +173,8 @@ under the License.
     </p>
 
     <p>
-      Some other database systems make the length specification optional. For Impala, the length is required.
+      Some other database systems make the length specification optional. For Impala, the length
+      is required.
     </p>
 
 <!--
@@ -146,142 +191,46 @@ it silently treats the value as length 255.
 
     <p conref="../shared/impala_common.xml#common/column_stats_constant"/>
 
-<!-- Seems like a logical design decision but don't think it's currently implemented like this.
-<p>
-Because both the maximum and average length are always known and always the same for
-any given <codeph>CHAR(<varname>n</varname>)</codeph> column, those fields are always filled
-in for <codeph>SHOW COLUMN STATS</codeph> output, even before you run
-<codeph>COMPUTE STATS</codeph> on the table.
-</p>
--->
-
     <p conref="../shared/impala_common.xml#common/udf_blurb_no"/>
 
-    <p conref="../shared/impala_common.xml#common/example_blurb"/>
-
-    <p>
-      These examples show how trailing spaces are not considered significant when comparing or processing
-      <codeph>CHAR</codeph> values. <codeph>CAST()</codeph> truncates any longer string to fit within the defined
-      length. If a <codeph>CHAR</codeph> value is shorter than the specified length, it is padded on the right with
-      spaces until it matches the specified length. Therefore, <codeph>LENGTH()</codeph> represents the length
-      including any trailing spaces, and <codeph>CONCAT()</codeph> also treats the column value as if it has
-      trailing spaces.
-    </p>
-
-<codeblock>select cast('x' as char(4)) = cast('x   ' as char(4)) as "unpadded equal to padded";
-+--------------------------+
-| unpadded equal to padded |
-+--------------------------+
-| true                     |
-+--------------------------+
-
-create table char_length(c char(3));
-insert into char_length values (cast('1' as char(3))), (cast('12' as char(3))), (cast('123' as char(3))), (cast('123456' as char(3)));
-select concat("[",c,"]") as c, length(c) from char_length;
-+-------+-----------+
-| c     | length(c) |
-+-------+-----------+
-| [1  ] | 3         |
-| [12 ] | 3         |
-| [123] | 3         |
-| [123] | 3         |
-+-------+-----------+
-</codeblock>
-
-    <p>
-      This example shows a case where data values are known to have a specific length, where <codeph>CHAR</codeph>
-      is a logical data type to use.
-<!--
-Because all the <codeph>CHAR</codeph> values have a constant predictable length,
-Impala can efficiently analyze how best to use these values in join queries,
-aggregation queries, and other contexts where column length is significant.
--->
-    </p>
+    <p conref="../shared/impala_common.xml#common/kudu_blurb"/>
 
-<codeblock>create table addresses
-  (id bigint,
-   street_name string,
-   state_abbreviation char(2),
-   country_abbreviation char(2));
-</codeblock>
+    <p conref="../shared/impala_common.xml#common/kudu_unsupported_data_type"/>
 
     <p>
-      The following example shows how values written by Impala do not physically include the trailing spaces. It
-      creates a table using text format, with <codeph>CHAR</codeph> values much shorter than the declared length,
-      and then prints the resulting data file to show that the delimited values are not separated by spaces. The
-      same behavior applies to binary-format Parquet data files.
+      <b>Performance consideration:</b>
     </p>
 
-<codeblock>create table char_in_text (a char(20), b char(30), c char(40))
-  row format delimited fields terminated by ',';
-
-insert into char_in_text values (cast('foo' as char(20)), cast('bar' as char(30)), cast('baz' as char(40))), (cast('hello' as char(20)), cast('goodbye' as char(30)), cast('aloha' as char(40)));
-
--- Running this Linux command inside impala-shell using the ! shortcut.
-!hdfs dfs -cat 'hdfs://127.0.0.1:8020/user/hive/warehouse/impala_doc_testing.db/char_in_text/*.*';
-foo,bar,baz
-hello,goodbye,aloha
-</codeblock>
-
     <p>
-      The following example further illustrates the treatment of spaces. It replaces the contents of the previous
-      table with some values including leading spaces, trailing spaces, or both. Any leading spaces are preserved
-      within the data file, but trailing spaces are discarded. Then when the values are retrieved by a query, the
-      leading spaces are retrieved verbatim while any necessary trailing spaces are supplied by Impala.
+      The <codeph>CHAR</codeph> type currently does not have the Impala Codegen support, and we
+      recommend using <codeph>VARCHAR</codeph> or <codeph>STRING</codeph> over
+      <codeph>CHAR</codeph> as the performance gain of Codegen outweighs the benefits of fixed
+      width <codeph>CHAR</codeph>.
     </p>
 
-<codeblock>insert overwrite char_in_text values (cast('trailing   ' as char(20)), cast('   leading and trailing   ' as char(30)), cast('   leading' as char(40)));
-!hdfs dfs -cat 'hdfs://127.0.0.1:8020/user/hive/warehouse/impala_doc_testing.db/char_in_text/*.*';
-trailing,   leading and trailing,   leading
-
-select concat('[',a,']') as a, concat('[',b,']') as b, concat('[',c,']') as c from char_in_text;
-+------------------------+----------------------------------+--------------------------------------------+
-| a                      | b                                | c                                          |
-+------------------------+----------------------------------+--------------------------------------------+
-| [trailing            ] | [   leading and trailing       ] | [   leading                              ] |
-+------------------------+----------------------------------+--------------------------------------------+
-</codeblock>
-
-    <p conref="../shared/impala_common.xml#common/kudu_blurb"/>
-    <p conref="../shared/impala_common.xml#common/kudu_unsupported_data_type"/>
-
     <p conref="../shared/impala_common.xml#common/restrictions_blurb"/>
 
     <p>
-      Because the blank-padding behavior requires allocating the maximum length for each value in memory, for
-      scalability reasons avoid declaring <codeph>CHAR</codeph> columns that are much longer than typical values in
-      that column.
+      Because the blank-padding behavior requires allocating the maximum length for each value
+      in memory, for scalability reasons, you should avoid declaring <codeph>CHAR</codeph>
+      columns that are much longer than typical values in that column.
     </p>
 
     <p conref="../shared/impala_common.xml#common/blobs_are_strings"/>
 
     <p>
       When an expression compares a <codeph>CHAR</codeph> with a <codeph>STRING</codeph> or
-      <codeph>VARCHAR</codeph>, the <codeph>CHAR</codeph> value is implicitly converted to <codeph>STRING</codeph>
-      first, with trailing spaces preserved.
+      <codeph>VARCHAR</codeph>, the <codeph>CHAR</codeph> value is implicitly converted to
+      <codeph>STRING</codeph> first, with trailing spaces preserved.
     </p>
 
-<codeblock>select cast("foo  " as char(5)) = 'foo' as "char equal to string";
-+----------------------+
-| char equal to string |
-+----------------------+
-| false                |
-+----------------------+
-</codeblock>
-
     <p>
       This behavior differs from other popular database systems. To get the expected result of
-      <codeph>TRUE</codeph>, cast the expressions on both sides to <codeph>CHAR</codeph> values of the appropriate
-      length:
+      <codeph>TRUE</codeph>, cast the expressions on both sides to <codeph>CHAR</codeph> values
+      of the appropriate length. For example:
     </p>
 
-<codeblock>select cast("foo  " as char(5)) = cast('foo' as char(3)) as "char equal to string";
-+----------------------+
-| char equal to string |
-+----------------------+
-| true                 |
-+----------------------+
-</codeblock>
+<codeblock>SELECT CAST("foo  " AS CHAR(5)) = CAST('foo' AS CHAR(3)); -- Returns TRUE.</codeblock>
 
     <p>
       This behavior is subject to change in future releases.
@@ -294,5 +243,7 @@ select concat('[',a,']') as a, concat('[',b,']') as b, concat('[',c,']') as c fr
       <xref href="impala_literals.xml#string_literals"/>,
       <xref href="impala_string_functions.xml#string_functions"/>
     </p>
+
   </conbody>
+
 </concept>


[3/5] impala git commit: IMPALA-7212: Removes --use_krpc flag and remove old DataStream services

Posted by bh...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 9f866ba..c1f9cc6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -30,14 +30,10 @@
 #include "rpc/rpc-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/krpc-data-stream-sender.h"
-#include "runtime/data-stream-sender.h"
-#include "runtime/data-stream-recvr-base.h"
-#include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/client-cache.h"
 #include "runtime/backend-client.h"
@@ -55,8 +51,6 @@
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
 #include "gen-cpp/data_stream_service.pb.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/Descriptors_types.h"
 #include "service/fe-support.h"
@@ -82,8 +76,6 @@ DECLARE_int32(datastream_service_num_deserialization_threads);
 DECLARE_int32(datastream_service_deserialization_queue_size);
 DECLARE_string(datastream_service_queue_mem_limit);
 
-DECLARE_bool(use_krpc);
-
 static const PlanNodeId DEST_NODE_ID = 1;
 static const int BATCH_CAPACITY = 100;  // rows
 static const int PER_ROW_DATA = 8;
@@ -93,39 +85,6 @@ static const int SHORT_SERVICE_QUEUE_MEM_LIMIT = 16;
 
 namespace impala {
 
-// This class acts as a service interface for all Thrift related communication within
-// this test file.
-class ImpalaThriftTestBackend : public ImpalaInternalServiceIf {
- public:
-  ImpalaThriftTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
-  virtual ~ImpalaThriftTestBackend() {}
-
-  virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
-      const TExecQueryFInstancesParams& params) {}
-  virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
-      const TCancelQueryFInstancesParams& params) {}
-  virtual void ReportExecStatus(TReportExecStatusResult& return_val,
-      const TReportExecStatusParams& params) {}
-  virtual void UpdateFilter(TUpdateFilterResult& return_val,
-      const TUpdateFilterParams& params) {}
-  virtual void PublishFilter(TPublishFilterResult& return_val,
-      const TPublishFilterParams& params) {}
-
-  virtual void TransmitData(
-      TTransmitDataResult& return_val, const TTransmitDataParams& params) {
-    if (!params.eos) {
-      mgr_->AddData(params.dest_fragment_instance_id, params.dest_node_id,
-                    params.row_batch, params.sender_id).SetTStatus(&return_val);
-    } else {
-      mgr_->CloseSender(params.dest_fragment_instance_id, params.dest_node_id,
-          params.sender_id).SetTStatus(&return_val);
-    }
-  }
-
- private:
-  DataStreamMgr* mgr_;
-};
-
 // This class acts as a service interface for all KRPC related communication within
 // this test file.
 class ImpalaKRPCTestBackend : public DataStreamServiceIf {
@@ -166,18 +125,7 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
   unique_ptr<MemTracker> mem_tracker_;
 };
 
-template <class T> class DataStreamTestBase : public T {
- protected:
-  virtual void SetUp() {}
-  virtual void TearDown() {}
-};
-
-enum KrpcSwitch {
-  USE_THRIFT,
-  USE_KRPC
-};
-
-class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch>> {
+class DataStreamTest : public testing::Test {
  protected:
   DataStreamTest() : next_val_(0) {
     // Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
@@ -186,9 +134,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   ~DataStreamTest() { runtime_state_->ReleaseResources(); }
 
   virtual void SetUp() {
-    // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
-    FLAGS_use_krpc = GetParam() == USE_KRPC;
-
     exec_env_.reset(new ExecEnv());
     ABORT_IF_ERROR(exec_env_->InitForFeTests());
     exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
@@ -234,14 +179,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     hash_sink_.output_partition.__isset.partition_exprs = true;
     hash_sink_.output_partition.partition_exprs.push_back(expr);
 
-    if (GetParam() == USE_THRIFT) {
-      StartThriftBackend();
-    } else {
-      IpAddr ip;
-      ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-      krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
-      StartKrpcBackend();
-    }
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
+    StartKrpcBackend();
   }
 
   const TDataSink GetSink(TPartitionType::type partition_type) {
@@ -267,12 +208,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     less_than_->Close(runtime_state_.get());
     ScalarExpr::Close(ordering_exprs_);
     mem_pool_->FreeAll();
-    if (GetParam() == USE_THRIFT) {
-      exec_env_->impalad_client_cache()->TestShutdown();
-      StopThriftBackend();
-    } else {
-      StopKrpcBackend();
-    }
+    StopKrpcBackend();
     exec_env_->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
 
@@ -312,8 +248,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   unique_ptr<ImpalaKRPCTestBackend> test_service_;
 
   // receiving node
-  DataStreamMgrBase* stream_mgr_ = nullptr;
-  ThriftServer* server_ = nullptr;
+  KrpcDataStreamMgr* stream_mgr_ = nullptr;
 
   // sending node(s)
   TDataStreamSink broadcast_sink_;
@@ -335,7 +270,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     int receiver_num;
 
     unique_ptr<thread> thread_handle;
-    shared_ptr<DataStreamRecvrBase> stream_recvr;
+    shared_ptr<KrpcDataStreamRecvr> stream_recvr;
     Status status;
     int num_rows_received = 0;
     multiset<int64_t> data_values;
@@ -360,9 +295,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     dest.fragment_instance_id = next_instance_id_;
     dest.thrift_backend.hostname = "localhost";
     dest.thrift_backend.port = FLAGS_port;
-    if (GetParam() == USE_KRPC) {
-      dest.__set_krpc_backend(krpc_address_);
-    }
+    dest.__set_krpc_backend(krpc_address_);
     *instance_id = next_instance_id_;
     ++next_instance_id_.lo;
   }
@@ -524,7 +457,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
           // hash-partitioned streams send values to the right partition
           int64_t value = *j;
           uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT,
-              DataStreamSender::EXCHANGE_HASH_SEED);
+              KrpcDataStreamSender::EXCHANGE_HASH_SEED);
           EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num);
         }
       }
@@ -550,21 +483,9 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     }
   }
 
-  // Start Thrift based backend in separate thread.
-  void StartThriftBackend() {
-    // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived type
-    // DataStreamMgr, since ImpalaThriftTestBackend() accepts only DataStreamMgr*.
-    boost::shared_ptr<ImpalaThriftTestBackend> handler(
-        new ImpalaThriftTestBackend(exec_env_->ThriftStreamMgr()));
-    boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
-    ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
-    ASSERT_OK(builder.Build(&server_));
-    ASSERT_OK(server_->Start());
-  }
-
   void StartKrpcBackend() {
     RpcMgr* rpc_mgr = exec_env_->rpc_mgr();
-    KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->KrpcStreamMgr();
+    KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->stream_mgr();
     ASSERT_OK(rpc_mgr->Init());
     test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr,
         exec_env_->process_mem_tracker()));
@@ -573,12 +494,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
   }
 
-  void StopThriftBackend() {
-    VLOG_QUERY << "stop backend\n";
-    server_->StopForTesting();
-    delete server_;
-  }
-
   void StopKrpcBackend() {
     exec_env_->rpc_mgr()->Shutdown();
   }
@@ -590,7 +505,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     sender_info_.emplace_back(make_unique<SenderInfo>());
     sender_info_.back()->thread_handle.reset(
         new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
-                   partition_type, GetParam() == USE_THRIFT));
+            partition_type));
   }
 
   void JoinSenders() {
@@ -600,8 +515,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     }
   }
 
-  void Sender(int sender_num,
-      int channel_buffer_size, TPartitionType::type partition_type, bool is_thrift) {
+  void Sender(
+      int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
@@ -615,18 +530,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     TExpr output_exprs;
     output_exprs.nodes.push_back(expr_node);
 
-    if (is_thrift) {
-      sender.reset(new DataStreamSender(
-          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
-      EXPECT_OK(static_cast<DataStreamSender*>(
-          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
-    } else {
-      sender.reset(new KrpcDataStreamSender(
-          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
-      EXPECT_OK(static_cast<KrpcDataStreamSender*>(
-          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
-    }
-
+    sender.reset(new KrpcDataStreamSender(
+        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
+    EXPECT_OK(static_cast<KrpcDataStreamSender*>(
+        sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
@@ -641,13 +548,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     VLOG_QUERY << "closing sender" << sender_num;
     info->status.MergeStatus(sender->FlushFinal(&state));
     sender->Close(&state);
-    if (is_thrift) {
-      info->num_bytes_sent = static_cast<DataStreamSender*>(
-          sender.get())->GetNumDataBytesSent();
-    } else {
-      info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
-          sender.get())->GetNumDataBytesSent();
-    }
+    info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+        sender.get())->GetNumDataBytesSent();
 
     batch->Reset();
     state.ReleaseResources();
@@ -672,18 +574,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 };
 
-// A seperate class for tests that are required to be run against Thrift only.
-class DataStreamTestThriftOnly : public DataStreamTest {
- protected:
-  virtual void SetUp() {
-    DataStreamTest::SetUp();
-  }
-
-  virtual void TearDown() {
-    DataStreamTest::TearDown();
-  }
-};
-
 // A seperate test class which simulates the behavior in which deserialization queue
 // fills up and all deserialization threads are busy.
 class DataStreamTestShortDeserQueue : public DataStreamTest {
@@ -715,19 +605,7 @@ class DataStreamTestShortServiceQueue : public DataStreamTest {
   }
 };
 
-INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
-    ::testing::Values(USE_KRPC, USE_THRIFT));
-
-INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
-    ::testing::Values(USE_THRIFT));
-
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortDeserQueue,
-    ::testing::Values(USE_KRPC));
-
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortServiceQueue,
-    ::testing::Values(USE_KRPC));
-
-TEST_P(DataStreamTest, UnknownSenderSmallResult) {
+TEST_F(DataStreamTest, UnknownSenderSmallResult) {
   // starting a sender w/o a corresponding receiver results in an error. No bytes should
   // be sent.
   // case 1: entire query result fits in single buffer
@@ -738,7 +616,7 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
 }
 
-TEST_P(DataStreamTest, UnknownSenderLargeResult) {
+TEST_F(DataStreamTest, UnknownSenderLargeResult) {
   // case 2: query result requires multiple buffers
   TUniqueId dummy_id;
   GetNextInstanceId(&dummy_id);
@@ -747,7 +625,7 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
 }
 
-TEST_P(DataStreamTest, Cancel) {
+TEST_F(DataStreamTest, Cancel) {
   TUniqueId instance_id;
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id);
   stream_mgr_->Cancel(instance_id);
@@ -758,7 +636,7 @@ TEST_P(DataStreamTest, Cancel) {
   EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
 }
 
-TEST_P(DataStreamTest, BasicTest) {
+TEST_F(DataStreamTest, BasicTest) {
   // TODO: also test that all client connections have been returned
   TPartitionType::type stream_types[] =
       {TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
@@ -781,54 +659,6 @@ TEST_P(DataStreamTest, BasicTest) {
   }
 }
 
-// This test checks for the avoidance of IMPALA-2931, which is a crash that would occur if
-// the parent memtracker of a DataStreamRecvr's memtracker was deleted before the
-// DataStreamRecvr was destroyed. The fix was to move decoupling the child tracker from
-// the parent into DataStreamRecvr::Close() which should always be called before the
-// parent is destroyed. In practice the parent is a member of the query's runtime state.
-//
-// TODO: Make lifecycle requirements more explicit.
-TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), exec_env_.get()));
-  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
-
-  // Start just one receiver.
-  TUniqueId instance_id;
-  GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
-      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_, nullptr);
-
-  // Perform tear down, but keep a reference to the receiver so that it is deleted last
-  // (to confirm that the destructor does not access invalid state after tear-down).
-  stream_recvr->Close();
-
-  // Force deletion of the parent memtracker by destroying it's owning runtime state.
-  runtime_state->ReleaseResources();
-  runtime_state.reset();
-
-  // Send an eos RPC to the receiver. Not required for tear-down, but confirms that the
-  // RPC does not cause an error (the receiver will still be called, since it is only
-  // Close()'d, not deleted from the data stream manager).
-  Status rpc_status;
-  ImpalaBackendConnection client(exec_env_->impalad_client_cache(),
-      MakeNetworkAddress("localhost", FLAGS_port), &rpc_status);
-  EXPECT_OK(rpc_status);
-  TTransmitDataParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_eos(true);
-  params.__set_dest_fragment_instance_id(instance_id);
-  params.__set_dest_node_id(DEST_NODE_ID);
-  TUniqueId dummy_id;
-  params.__set_sender_id(0);
-
-  TTransmitDataResult result;
-  rpc_status = client.DoRpc(&ImpalaBackendClient::TransmitData, params, &result);
-
-  // Finally, stream_recvr destructor happens here. Before fix for IMPALA-2931, this
-  // would have resulted in a crash.
-  stream_recvr.reset();
-}
-
 // This test is to exercise a previously present deadlock path which is now fixed, to
 // ensure that the deadlock does not happen anymore. It does this by doing the following:
 // This test starts multiple senders to send to the same receiver. It makes sure that
@@ -841,7 +671,7 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
 // already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ as well.
 // But the first thread will never release the lock since it's stuck on Offer(), causing
 // a deadlock. This is fixed with IMPALA-6346.
-TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
+TEST_F(DataStreamTestShortDeserQueue, TestNoDeadlock) {
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
 
@@ -874,7 +704,7 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
 }
 
 // Test that payloads larger than the service queue's soft mem limit can be transmitted.
-TEST_P(DataStreamTestShortServiceQueue, TestLargePayload) {
+TEST_F(DataStreamTestShortServiceQueue, TestLargePayload) {
   TestStream(
       TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, false);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 349b817..319e948 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -35,7 +35,6 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -80,8 +79,6 @@ DEFINE_int32(state_store_subscriber_port, 23000,
     "port where StatestoreSubscriberService should be exported");
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
-DEFINE_bool(use_krpc, true, "If true, use KRPC for the DataStream subsystem. "
-    "Otherwise use Thrift RPC.");
 
 DEFINE_bool_hidden(use_local_catalog, false,
   "Use experimental implementation of a local catalog. If this is set, "
@@ -159,15 +156,10 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, backend_port)) {
 
-  if (FLAGS_use_krpc) {
-    VLOG_QUERY << "Using KRPC.";
-    // KRPC relies on resolved IP address. It's set in Init().
-    krpc_address_.__set_port(krpc_port);
-    rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
-    stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
-  } else {
-    stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
-  }
+  // KRPC relies on resolved IP address. It's set in Init().
+  krpc_address_.__set_port(krpc_port);
+  rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
+  stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
 
   request_pool_service_.reset(new RequestPoolService(metrics_.get()));
 
@@ -298,19 +290,17 @@ Status ExecEnv::Init() {
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 
   // Initializes the RPCMgr and DataStreamServices.
-  if (FLAGS_use_krpc) {
-    krpc_address_.__set_hostname(ip_address_);
-    // Initialization needs to happen in the following order due to dependencies:
-    // - RPC manager, DataStreamService and DataStreamManager.
-    RETURN_IF_ERROR(rpc_mgr_->Init());
-    data_svc_.reset(new DataStreamService(rpc_metrics_));
-    RETURN_IF_ERROR(data_svc_->Init());
-    RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
-    // Bump thread cache to 1GB to reduce contention for TCMalloc central
-    // list's spinlock.
-    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
-      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
-    }
+  krpc_address_.__set_hostname(ip_address_);
+  // Initialization needs to happen in the following order due to dependencies:
+  // - RPC manager, DataStreamService and DataStreamManager.
+  RETURN_IF_ERROR(rpc_mgr_->Init());
+  data_svc_.reset(new DataStreamService(rpc_metrics_));
+  RETURN_IF_ERROR(data_svc_->Init());
+  RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));
+  // Bump thread cache to 1GB to reduce contention for TCMalloc central
+  // list's spinlock.
+  if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
+    FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
   }
 
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
@@ -396,10 +386,8 @@ Status ExecEnv::StartStatestoreSubscriberService() {
 }
 
 Status ExecEnv::StartKrpcService() {
-  if (FLAGS_use_krpc) {
-    LOG(INFO) << "Starting KRPC service";
-    RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
-  }
+  LOG(INFO) << "Starting KRPC service";
+  RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
   return Status::OK();
 }
 
@@ -440,14 +428,4 @@ Status ExecEnv::GetKuduClient(
   return Status::OK();
 }
 
-DataStreamMgr* ExecEnv::ThriftStreamMgr() {
-  DCHECK(!FLAGS_use_krpc);
-  return dynamic_cast<DataStreamMgr*>(stream_mgr_.get());
-}
-
-KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() {
-  DCHECK(FLAGS_use_krpc);
-  return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get());
-}
-
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 54a042a..3832d0d 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -41,8 +41,6 @@ namespace impala {
 class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
-class DataStreamMgrBase;
-class DataStreamMgr;
 class DataStreamService;
 class QueryExecMgr;
 class Frontend;
@@ -109,13 +107,7 @@ class ExecEnv {
   /// StartServices() was successful.
   TNetworkAddress GetThriftBackendAddress() const;
 
-  DataStreamMgrBase* stream_mgr() { return stream_mgr_.get(); }
-
-  /// TODO: Remove once a single DataStreamMgrBase implementation is standardized on.
-  /// Clients of DataStreamMgrBase should use stream_mgr() unless they need to access
-  /// members that are not a part of the DataStreamMgrBase interface.
-  DataStreamMgr* ThriftStreamMgr();
-  KrpcDataStreamMgr* KrpcStreamMgr();
+  KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
 
   ImpalaBackendClientCache* impalad_client_cache() {
     return impalad_client_cache_.get();
@@ -175,7 +167,7 @@ class ExecEnv {
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;
   boost::scoped_ptr<MetricGroup> metrics_;
-  boost::scoped_ptr<DataStreamMgrBase> stream_mgr_;
+  boost::scoped_ptr<KrpcDataStreamMgr> stream_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index af03a2b..11122c7 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -34,14 +34,14 @@
 #include "exec/scan-node.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
-#include "runtime/runtime-filter-bank.h"
 #include "runtime/client-cache.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/runtime-state.h"
+#include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/query-state.h"
 #include "runtime/query-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/runtime-state.h"
 #include "runtime/thread-resource-mgr.h"
 #include "scheduling/query-schedule.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index bc490dd..3b11c07 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -52,8 +52,8 @@
 /// TODO: We don't need millisecond precision here.
 const int32_t STREAM_EXPIRATION_TIME_MS = 300 * 1000;
 
-DECLARE_bool(use_krpc);
-DECLARE_int32(datastream_sender_timeout_ms);
+DEFINE_int32(datastream_sender_timeout_ms, 120000, "(Advanced) The time, in ms, that can "
+    "elapse  before a plan fragment will time-out trying to send the initial row batch.");
 DEFINE_int32(datastream_service_num_deserialization_threads, 16,
     "Number of threads for deserializing RPC requests deferred due to the receiver "
     "not ready or the soft limit of the receiver is reached.");
@@ -98,7 +98,7 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
+shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::CreateRecvr(
     const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
     int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
     MemTracker* parent_tracker, BufferPool::ClientHandle* client) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 3cd2191..5d7bc56 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
 #define IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
 
-#include "runtime/data-stream-mgr-base.h"
-
 #include <list>
 #include <queue>
 #include <set>
@@ -30,7 +28,6 @@
 
 #include "common/status.h"
 #include "common/object-pool.h"
-#include "runtime/data-stream-mgr-base.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
 #include "runtime/row-batch.h"
 #include "util/metrics.h"
@@ -225,7 +222,7 @@ struct EndDataStreamCtx {
 ///  time.
 ///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
 ///  timed-out while waiting for a receiver.
-class KrpcDataStreamMgr : public DataStreamMgrBase {
+class KrpcDataStreamMgr : public CacheLineAligned {
  public:
   KrpcDataStreamMgr(MetricGroup* metrics);
 
@@ -243,10 +240,10 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller. 'client' is the BufferPool's client handle for allocating buffers.
   /// It's owned by the parent exchange node.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+  std::shared_ptr<KrpcDataStreamRecvr> CreateRecvr(const RowDescriptor* row_desc,
       const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
       int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
+      MemTracker* parent_tracker, BufferPool::ClientHandle* client);
 
   /// Handler for TransmitData() RPC.
   ///
@@ -286,7 +283,7 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Cancels all receivers registered for fragment_instance_id immediately. The
   /// receivers will not accept any row batches after being cancelled. Any buffered
   /// row batches will not be freed until Close() is called on the receivers.
-  void Cancel(const TUniqueId& fragment_instance_id) override;
+  void Cancel(const TUniqueId& fragment_instance_id);
 
   /// Waits for maintenance thread and sender response thread pool to finish.
   ~KrpcDataStreamMgr();

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 3933e02..96cc25f 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -43,7 +43,6 @@
 
 #include "common/names.h"
 
-DECLARE_bool(use_krpc);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 
 using kudu::MonoDelta;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 18454d7..e19f686 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
 #define IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
 
-#include "data-stream-recvr-base.h"
-
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
@@ -84,7 +82,7 @@ class TransmitDataResponsePB;
 /// - no new row batch or deferred RPCs should be added to a cancelled sender queue
 /// - Cancel() will drain the deferred RPCs queue and the row batch queue
 ///
-class KrpcDataStreamRecvr : public DataStreamRecvrBase {
+class KrpcDataStreamRecvr {
  public:
   ~KrpcDataStreamRecvr();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 6928ebc..5cc80ce 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -34,10 +34,10 @@
 #include "exprs/timezone_db.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
@@ -302,7 +302,7 @@ io::DiskIoMgr* RuntimeState::io_mgr() {
   return exec_env_->disk_io_mgr();
 }
 
-DataStreamMgrBase* RuntimeState::stream_mgr() {
+KrpcDataStreamMgr* RuntimeState::stream_mgr() {
   return exec_env_->stream_mgr();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 6d38fc6..78a9864 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -36,6 +36,7 @@ class BufferPool;
 class DataStreamRecvr;
 class DescriptorTbl;
 class Expr;
+class KrpcDataStreamMgr;
 class LlvmCodeGen;
 class MemTracker;
 class ObjectPool;
@@ -47,7 +48,6 @@ class TimestampValue;
 class ThreadResourcePool;
 class TUniqueId;
 class ExecEnv;
-class DataStreamMgrBase;
 class HBaseTableFactory;
 class TPlanFragmentCtx;
 class TPlanFragmentInstanceCtx;
@@ -107,7 +107,7 @@ class RuntimeState {
         : no_instance_id_;
   }
   ExecEnv* exec_env() { return exec_env_; }
-  DataStreamMgrBase* stream_mgr();
+  KrpcDataStreamMgr* stream_mgr();
   HBaseTableFactory* htable_factory();
   ImpalaBackendClientCache* impalad_client_cache();
   CatalogServiceClientCache* catalogd_client_cache();

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 2baffbb..20acc43 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -46,8 +46,6 @@ using namespace apache::thrift;
 using namespace org::apache::impala::fb;
 using namespace strings;
 
-DECLARE_bool(use_krpc);
-
 namespace impala {
 
 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -75,12 +73,10 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
   // requests.
   local_backend_descriptor_.ip_address = ip;
   LOG(INFO) << "Scheduler using " << ip << " as IP address";
-  if (FLAGS_use_krpc) {
-    // KRPC relies on resolved IP address.
-    DCHECK(IsResolvedAddress(krpc_address));
-    DCHECK_EQ(krpc_address.hostname, ip);
-    local_backend_descriptor_.__set_krpc_address(krpc_address);
-  }
+  // KRPC relies on resolved IP address.
+  DCHECK(IsResolvedAddress(krpc_address));
+  DCHECK_EQ(krpc_address.hostname, ip);
+  local_backend_descriptor_.__set_krpc_address(krpc_address);
 
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
@@ -346,12 +342,10 @@ void Scheduler::ComputeFragmentExecParams(
         dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
         const TNetworkAddress& host = dest_params->instance_exec_params[i].host;
         dest.__set_thrift_backend(host);
-        if (FLAGS_use_krpc) {
-          const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
-          DCHECK(desc.__isset.krpc_address);
-          DCHECK(IsResolvedAddress(desc.krpc_address));
-          dest.__set_krpc_backend(desc.krpc_address);
-        }
+        const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
+        DCHECK(desc.__isset.krpc_address);
+        DCHECK(IsResolvedAddress(desc.krpc_address));
+        dest.__set_krpc_backend(desc.krpc_address);
       }
 
       // enumerate senders consecutively;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 1d42a99..b7892ff 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -74,14 +74,14 @@ Status DataStreamService::Init() {
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {
   // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.
-  ExecEnv::GetInstance()->KrpcStreamMgr()->CloseSender(request, response, rpc_context);
+  ExecEnv::GetInstance()->stream_mgr()->CloseSender(request, response, rpc_context);
 }
 
 void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
   FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
   // AddData() is guaranteed to eventually respond to this RPC so we don't do it here.
-  ExecEnv::GetInstance()->KrpcStreamMgr()->AddData(request, response, rpc_context);
+  ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
 }
 
 template<typename ResponsePBType>

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 53a62da..c479a7f 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -80,15 +80,6 @@ void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val
   impala_server_->ReportExecStatus(return_val, params);
 }
 
-void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val,
-    const TTransmitDataParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
-  DCHECK(params.__isset.dest_fragment_instance_id);
-  DCHECK(params.__isset.sender_id);
-  DCHECK(params.__isset.dest_node_id);
-  impala_server_->TransmitData(return_val, params);
-}
-
 void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
     const TUpdateFilterParams& params) {
   FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 3285d9d..8d5ddd5 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -37,8 +37,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
       const TCancelQueryFInstancesParams& params);
   virtual void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params);
-  virtual void TransmitData(TTransmitDataResult& return_val,
-      const TTransmitDataParams& params);
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 97fa70e..077f634 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -52,7 +52,6 @@
 #include "rpc/thrift-util.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
@@ -119,7 +118,6 @@ DECLARE_string(authorized_proxy_group_config);
 DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
-DECLARE_bool(use_krpc);
 DECLARE_bool(use_local_catalog);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
@@ -1268,33 +1266,6 @@ void ImpalaServer::ReportExecStatus(
   request_state->UpdateBackendExecStatus(params).SetTStatus(&return_val);
 }
 
-void ImpalaServer::TransmitData(
-    TTransmitDataResult& return_val, const TTransmitDataParams& params) {
-  VLOG_ROW << "TransmitData(): instance_id=" << PrintId(params.dest_fragment_instance_id)
-           << " node_id=" << params.dest_node_id
-           << " #rows=" << params.row_batch.num_rows
-           << " sender_id=" << params.sender_id
-           << " eos=" << (params.eos ? "true" : "false");
-  // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
-  // of having to copy its data
-  if (params.row_batch.num_rows > 0) {
-    Status status = exec_env_->ThriftStreamMgr()->AddData(
-        params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
-        params.sender_id);
-    status.SetTStatus(&return_val);
-    if (!status.ok()) {
-      // should we close the channel here as well?
-      return;
-    }
-  }
-
-  if (params.eos) {
-    exec_env_->ThriftStreamMgr()->CloseSender(
-        params.dest_fragment_instance_id, params.dest_node_id,
-        params.sender_id).SetTStatus(&return_val);
-  }
-}
-
 void ImpalaServer::InitializeConfigVariables() {
   // Set idle_session_timeout here to let the SET command return the value of
   // the command line option FLAGS_idle_session_timeout
@@ -1755,11 +1726,10 @@ void ImpalaServer::AddLocalBackendToStatestore(
   local_backend_descriptor.ip_address = exec_env_->ip_address();
   local_backend_descriptor.__set_proc_mem_limit(
       exec_env_->process_mem_tracker()->limit());
-  if (FLAGS_use_krpc) {
-    const TNetworkAddress& krpc_address = exec_env_->krpc_address();
-    DCHECK(IsResolvedAddress(krpc_address));
-    local_backend_descriptor.__set_krpc_address(krpc_address);
-  }
+  const TNetworkAddress& krpc_address = exec_env_->krpc_address();
+  DCHECK(IsResolvedAddress(krpc_address));
+  local_backend_descriptor.__set_krpc_address(krpc_address);
+
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
   update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a31734b..8b7af09 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -59,8 +59,6 @@ class TPlanExecParams;
 class TInsertResult;
 class TReportExecStatusArgs;
 class TReportExecStatusResult;
-class TTransmitDataArgs;
-class TTransmitDataResult;
 class TNetworkAddress;
 class TClientRequest;
 class TExecRequest;
@@ -273,8 +271,6 @@ class ImpalaServer : public ImpalaServiceIf,
   /// ImpalaInternalService rpcs
   void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params);
-  void TransmitData(TTransmitDataResult& return_val,
-      const TTransmitDataParams& params);
   void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/bin/run-all-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh
index 83b3548..5f6831e 100755
--- a/bin/run-all-tests.sh
+++ b/bin/run-all-tests.sh
@@ -39,8 +39,6 @@ if "${CLUSTER_DIR}/admin" is_kerberized; then
 fi
 
 # Parametrized Test Options
-# Disable KRPC for test cluster and test execution
-: ${DISABLE_KRPC:=false}
 # Run FE Tests
 : ${FE_TEST:=true}
 # Run Backend Tests
@@ -77,11 +75,6 @@ if [[ "${ERASURE_CODING}" = true ]]; then
     --impalad_args=--default_query_options=allow_erasure_coded_files=true"
 fi
 
-# If KRPC tests are disabled, pass the flag to disable KRPC during cluster start.
-if [[ "${DISABLE_KRPC}" == "true" ]]; then
-  TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} --disable_krpc"
-fi
-
 # Indicates whether code coverage reports should be generated.
 : ${CODE_COVERAGE:=false}
 
@@ -130,12 +123,6 @@ if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
   COMMON_PYTEST_ARGS+=" --impalad=localhost:21000"
 fi
 
-# If KRPC tests are disabled, pass test_no_krpc flag to pytest.
-# This includes the end-to-end tests and the custom cluster tests.
-if [[ "${DISABLE_KRPC}" == "true" ]]; then
-  COMMON_PYTEST_ARGS+=" --test_no_krpc"
-fi
-
 # For logging when using run-step.
 LOG_DIR="${IMPALA_EE_TEST_LOGS_DIR}"
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 4d34d45..208cc35 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -60,8 +60,6 @@ parser.add_option("--state_store_args", dest="state_store_args", action="append"
 parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
                   type="string", default=[],
                   help="Additional arguments to pass to the Catalog Service at startup")
-parser.add_option("--disable_krpc", dest="disable_krpc", action="store_true",
-                  default=False, help="Disable KRPC DataStream service during startup.")
 parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
                   default=False, help="Instead of starting the cluster, just kill all"
                   " the running impalads and the statestored.")
@@ -330,9 +328,6 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
           delay=delay_list[i],
           args=args)
 
-    if options.disable_krpc:
-      args = "-use_krpc=false {args}".format(args=args)
-
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
       args = "{args} {per_impalad_args}".format(

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 4797eba..bf48aaa 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -704,34 +704,6 @@ struct TCancelQueryFInstancesResult {
   1: optional Status.TStatus status
 }
 
-
-// TransmitData
-
-struct TTransmitDataParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // required in V1
-  2: optional Types.TUniqueId dest_fragment_instance_id
-
-  // Id of this fragment in its role as a sender.
-  3: optional i32 sender_id
-
-  // required in V1
-  4: optional Types.TPlanNodeId dest_node_id
-
-  // optional in V1
-  5: optional Results.TRowBatch row_batch
-
-  // if set to true, indicates that no more row batches will be sent
-  // for this dest_node_id
-  6: optional bool eos
-}
-
-struct TTransmitDataResult {
-  // required in V1
-  1: optional Status.TStatus status
-}
-
 // Parameters for RequestPoolService.resolveRequestPool()
 // TODO: why is this here?
 struct TResolveRequestPoolParams {
@@ -881,10 +853,6 @@ service ImpalaInternalService {
   TCancelQueryFInstancesResult CancelQueryFInstances(
       1:TCancelQueryFInstancesParams params);
 
-  // Called by sender to transmit single row batch. Returns error indication
-  // if params.fragmentId or params.destNodeId are unknown or if data couldn't be read.
-  TTransmitDataResult TransmitData(1:TTransmitDataParams params);
-
   // Called by fragment instances that produce local runtime filters to deliver them to
   // the coordinator for aggregation and broadcast.
   TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index a5c285d..5947ce9 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -145,9 +145,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if use_exclusive_coordinators:
       cmd.append("--use_exclusive_coordinators")
 
-    if pytest.config.option.test_no_krpc:
-      cmd.append("--disable_krpc")
-
     if os.environ.get("ERASURE_CODING") == "true":
       cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true")
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index e84c75b..d23b260 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -89,10 +89,6 @@ class SkipIf:
   not_ec = pytest.mark.skipif(not IS_EC, reason="Erasure Coding needed")
   no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
       reason="Secondary filesystem needed")
-  not_krpc = pytest.mark.skipif(pytest.config.option.test_no_krpc,
-      reason="Test is only supported when using KRPC.")
-  not_thrift = pytest.mark.skipif(not pytest.config.option.test_no_krpc,
-      reason="Test is only supported when using Thrift RPC.")
 
 class SkipIfIsilon:
   caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented for Isilon")

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/test_skip.py
----------------------------------------------------------------------
diff --git a/tests/common/test_skip.py b/tests/common/test_skip.py
deleted file mode 100644
index 3e7d281..0000000
--- a/tests/common/test_skip.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIf
-
-class TestSkipIf(ImpalaTestSuite):
-  """
-  This suite tests the effectiveness of various SkipIf decorators.
-  TODO: Remove this once we have tests that make use of these decorators.
-  """
-
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
-
-  @SkipIf.not_krpc
-  def test_skip_if_not_krpc(self):
-    assert not pytest.config.option.test_no_krpc
-
-  @SkipIf.not_thrift
-  def test_skip_if_not_thrift(self):
-    assert pytest.config.option.test_no_krpc

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index a1f1859..f01ecb2 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -116,10 +116,6 @@ def pytest_addoption(parser):
                    help=("Indicates that tests are being run against a remote cluster. "
                          "Some tests may be marked to skip or xfail on remote clusters."))
 
-  parser.addoption("--test_no_krpc", dest="test_no_krpc", action="store_true",
-                   default=False, help="Run all tests with KRPC disabled. This assumes "
-                   "that the test cluster has been started with --disable_krpc.")
-
   parser.addoption("--shard_tests", default=None,
                    help="If set to N/M (e.g., 3/5), will split the tests into "
                    "M partitions and run the Nth partition. 1-indexed.")

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
index d358baa..6b0fe6a 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -26,7 +26,6 @@ DATA_STREAM_MGR_METRIC = "Data Stream Manager Early RPCs"
 DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
 ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
 
-@SkipIf.not_krpc
 class TestKrpcMemUsage(CustomClusterTestSuite):
   """Test for memory usage tracking when using KRPC."""
   TEST_QUERY = "select count(c2.string_col) from \

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
index de16ccc..e728237 100644
--- a/tests/custom_cluster/test_krpc_metrics.py
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -24,7 +24,6 @@ from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIf, SkipIfBuildType
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 
-@SkipIf.not_krpc
 class TestKrpcMetrics(CustomClusterTestSuite):
   """Test for KRPC metrics that require special arguments during cluster startup."""
   RPCZ_URL = 'http://localhost:25000/rpcz?json'

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_rpc_exception.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py
index 2784e88..fc60c1a 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -68,12 +68,6 @@ class TestRPCException(CustomClusterTestSuite):
   def test_rpc_send_timed_out(self, vector):
     self.execute_test_query(None)
 
-  @SkipIf.not_thrift
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
-  def test_rpc_recv_closed_connection(self, vector):
-    self.execute_test_query("Called read on non-open socket")
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
   def test_rpc_recv_timed_out(self, vector):
@@ -94,12 +88,6 @@ class TestRPCException(CustomClusterTestSuite):
   def test_rpc_secure_send_timed_out(self, vector):
     self.execute_test_query(None)
 
-  @SkipIf.not_thrift
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
-  def test_rpc_secure_recv_closed_connection(self, vector):
-    self.execute_test_query("TTransportException: Transport not open")
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
   def test_rpc_secure_recv_timed_out(self, vector):

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/query_test/test_codegen.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py
index 5bc0cc6..7436c13 100644
--- a/tests/query_test/test_codegen.py
+++ b/tests/query_test/test_codegen.py
@@ -52,7 +52,6 @@ class TestCodegen(ImpalaTestSuite):
     assert len(exec_options) > 0
     assert_codegen_enabled(result.runtime_profile, [1])
 
-  @SkipIf.not_krpc
   def test_datastream_sender_codegen(self, vector):
     """Test the KrpcDataStreamSender's codegen logic"""
     self.run_test_case('QueryTest/datastream-sender-codegen', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 60deca4..000e386 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -249,7 +249,6 @@ class TestWebPage(ImpalaTestSuite):
       assert any(pattern in t for t in thread_names), \
            "Could not find thread matching '%s'" % pattern
 
-  @SkipIf.not_krpc
   def test_krpc_rpcz(self):
     """Test that KRPC metrics are exposed in /rpcz and that they are updated when
     executing a query."""