You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/09/07 17:15:15 UTC

[impala] 03/03: IMPALA-10213: Add test for local vs remote scheduling

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 79e474d3109e1738c66fd01e9bf28a97b0df9c90
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Thu Aug 11 12:03:53 2022 -0700

    IMPALA-10213: Add test for local vs remote scheduling
    
    Impala already supports locality-aware scheduling with Ozone because it
    returns location data on partitions. That data doesn't include specific
    storage ids in getStorageIds, so we skip a warning that will always
    trigger on Ozone.
    
    Updates Ozone to add implicit rules mapping localhost -> 127.0.0.1 for
    local development. HDFS translates localhost to 127.0.0.1 for host names
    in its location data, which Impala will identify as colocated with
    executors in the dev environment. Ozone doesn't, and the default Impala
    hostname is the machine hostname - not localhost - so without this
    change all HDFS access in the minicluster is local but all Ozone access
    is remote.
    
    Adds a test to verify local vs remote assignment by using custom
    clusters with hostnames that either do or don't match storage hostnames.
    
    Change-Id: I4e5606528404c3d4fd164c03dec8315345be5f6d
    Reviewed-on: http://gerrit.cloudera.org:8080/18841
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
---
 be/src/runtime/io/disk-io-mgr.cc                   |  1 -
 be/src/scheduling/executor-group.cc                |  6 +++
 .../org/apache/impala/planner/HdfsScanNode.java    |  6 ++-
 tests/common/skip.py                               |  2 -
 tests/custom_cluster/test_scheduler_locality.py    | 53 ++++++++++++++++++++++
 tests/metadata/test_stats_extrapolation.py         |  3 +-
 6 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index e3e181290..7195ad3a3 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -844,7 +844,6 @@ int DiskIoMgr::AssignQueue(
   DCHECK(!IsOSSPath(file, check_default_fs)); // OSS/JindoFS is always remote.
   DCHECK(!IsGcsPath(file, check_default_fs)); // GCS is always remote.
   DCHECK(!IsCosPath(file, check_default_fs)); // COS is always remote.
-  // TODO: why is Ozone sometimes local?
   DCHECK(!IsSFSPath(file, check_default_fs)); // SFS is always remote.
   if (disk_id == -1) {
     // disk id is unknown, assign it an arbitrary one.
diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc
index 93dadc135..3ac6bd68d 100644
--- a/be/src/scheduling/executor-group.cc
+++ b/be/src/scheduling/executor-group.cc
@@ -96,6 +96,12 @@ void ExecutorGroup::AddExecutor(const BackendDescriptorPB& be_desc) {
   }
   be_descs.push_back(be_desc);
   executor_ip_map_[be_desc.address().hostname()] = be_desc.ip_address();
+  if (be_desc.ip_address() == "127.0.0.1") {
+    // Include localhost as an alias for filesystems that don't translate it.
+    LOG(INFO) << "Adding executor localhost alias for "
+              << be_desc.address().hostname() << " -> " << be_desc.ip_address();
+    executor_ip_map_["localhost"] = be_desc.ip_address();
+  }
 }
 
 void ExecutorGroup::RemoveExecutor(const BackendDescriptorPB& be_desc) {
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 708b9996b..a3c8de51d 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1367,6 +1367,7 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkArgument(fileDesc.getNumFileBlocks() > 0);
     boolean fileDescMissingDiskIds = false;
     long fileMaxScanRangeBytes = 0;
+    FileSystemUtil.FsType fsType = partition.getFsType();
     for (int i = 0; i < fileDesc.getNumFileBlocks(); ++i) {
       FbFileBlock block = fileDesc.getFbFileBlock(i);
       int replicaHostCount = FileBlock.getNumReplicaHosts(block);
@@ -1388,7 +1389,10 @@ public class HdfsScanNode extends ScanNode {
         Integer globalHostIdx = analyzer.getHostIndex().getOrAddIndex(networkAddress);
         location.setHost_idx(globalHostIdx);
         if (fsHasBlocks && !fileDesc.getIsEc() && FileBlock.getDiskId(block, j) == -1) {
-          ++numScanRangesNoDiskIds_;
+          // Skip Ozone; it returns NULL storageIds and users can't do anything about it.
+          if (fsType != FileSystemUtil.FsType.OZONE) {
+            ++numScanRangesNoDiskIds_;
+          }
           fileDescMissingDiskIds = true;
         }
         location.setVolume_id(FileBlock.getDiskId(block, j));
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 1b4b86643..8f380419f 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -62,8 +62,6 @@ class SkipIfFS:
       reason="IMPALA-10562")
   late_filters = pytest.mark.skipif(IS_ISILON, reason="IMPALA-6998")
   read_past_eof = pytest.mark.skipif(IS_S3 or IS_GCS, reason="IMPALA-2512")
-  no_storage_ids = pytest.mark.skipif(IS_OZONE,
-      reason="Ozone does not return storage ids, IMPALA-10213")
   large_block_size = pytest.mark.skipif(IS_OZONE,
       reason="block size is larger than 128MB")
 
diff --git a/tests/custom_cluster/test_scheduler_locality.py b/tests/custom_cluster/test_scheduler_locality.py
new file mode 100644
index 000000000..bccbcfa82
--- /dev/null
+++ b/tests/custom_cluster/test_scheduler_locality.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Tests for local and remote disk scheduling.
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.network import get_external_ip
+
+
+LOCAL_ASSIGNMENTS_METRIC = "simple-scheduler.local-assignments.total"
+TOTAL_ASSIGNMENTS_METRIC = "simple-scheduler.assignments.total"
+
+
+class TestSchedulerLocality(CustomClusterTestSuite):
+  """Tests for local and remote disk scheduling."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @CustomClusterTestSuite.with_args(
+      impalad_args='--hostname=localhost', cluster_size=1)
+  def test_local_assignment(self, vector):
+    self.client.execute('select count(1) from functional.alltypes')
+    for impalad in self.cluster.impalads:
+      impalad.service.wait_for_metric_value(LOCAL_ASSIGNMENTS_METRIC, 1,
+          allow_greater=True)
+      assignments, local_assignments = impalad.service.get_metric_values([
+          TOTAL_ASSIGNMENTS_METRIC, LOCAL_ASSIGNMENTS_METRIC])
+      assert assignments == local_assignments
+
+  @CustomClusterTestSuite.with_args(
+      impalad_args='--hostname=' + get_external_ip(), cluster_size=1)
+  def test_remote_assignment(self, vector):
+    self.client.execute('select count(1) from functional.alltypes')
+    for impalad in self.cluster.impalads:
+      impalad.service.wait_for_metric_value(TOTAL_ASSIGNMENTS_METRIC, 1,
+          allow_greater=True)
+      assert impalad.service.get_metric_value(LOCAL_ASSIGNMENTS_METRIC) == 0
diff --git a/tests/metadata/test_stats_extrapolation.py b/tests/metadata/test_stats_extrapolation.py
index 5f613a495..605bf663c 100644
--- a/tests/metadata/test_stats_extrapolation.py
+++ b/tests/metadata/test_stats_extrapolation.py
@@ -17,7 +17,7 @@
 
 from os import path
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfEC, SkipIfFS
+from tests.common.skip import SkipIfEC
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -39,7 +39,6 @@ class TestStatsExtrapolation(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         create_uncompressed_text_dimension(cls.get_workload()))
 
-  @SkipIfFS.no_storage_ids
   @SkipIfEC.contain_full_explain
   def test_stats_extrapolation(self, vector, unique_database):
     vector.get_value('exec_option')['num_nodes'] = 1