You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2021/08/10 02:20:41 UTC

[spark] branch branch-3.2 updated: [SPARK-36460][SHUFFLE] Pull out NoOpMergedShuffleFileManager inner class outside

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1a432fe  [SPARK-36460][SHUFFLE] Pull out NoOpMergedShuffleFileManager inner class outside
1a432fe is described below

commit 1a432fe6bb7e221a56852f671685910158a22dd2
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Tue Aug 10 10:19:24 2021 +0800

    [SPARK-36460][SHUFFLE] Pull out NoOpMergedShuffleFileManager inner class outside
    
    ### What changes were proposed in this pull request?
    
    Pull out NoOpMergedShuffleFileManager inner class outside. This is required since passing dollar sign ($) for the config (`spark.shuffle.server.mergedShuffleFileManagerImpl`) value can be an issue. Currently `spark.shuffle.server.mergedShuffleFileManagerImpl` is by default set to `org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager`. After this change the default value be set to `org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager`
    
    ### Why are the changes needed?
    
    Passing `$` for the config value can be an issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Modified existing unit tests.
    
    Closes #33688 from venkata91/SPARK-36460.
    
    Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
    (cherry picked from commit df0de83c4679a5c8a9c0e3c7995bc7d692e121f0)
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../apache/spark/network/util/TransportConf.java   |  4 +-
 .../network/shuffle/ExternalBlockHandler.java      | 58 ---------------
 .../shuffle/NoOpMergedShuffleFileManager.java      | 86 ++++++++++++++++++++++
 .../spark/network/yarn/YarnShuffleService.java     |  3 +-
 .../network/yarn/YarnShuffleServiceSuite.scala     |  8 +-
 5 files changed, 94 insertions(+), 65 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 8e7ecf5..69b8b25 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -378,13 +378,13 @@ public class TransportConf {
    * Class name of the implementation of MergedShuffleFileManager that merges the blocks
    * pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
    * a cluster level because this configuration is set to
-   * 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
+   * 'org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager'.
    * To turn on push-based shuffle at a cluster level, set the configuration to
    * 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
    */
   public String mergedShuffleFileManagerImpl() {
     return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
-      "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
+      "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager");
   }
 
   /**
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 71741f2..1e413f6 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -594,64 +594,6 @@ public class ExternalBlockHandler extends RpcHandler
     }
   }
 
-  /**
-   * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
-   * is not enabled.
-   *
-   * @since 3.1.0
-   */
-  public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
-
-    // This constructor is needed because we use this constructor to instantiate an implementation
-    // of MergedShuffleFileManager using reflection.
-    // See YarnShuffleService#newMergedShuffleFileManagerInstance.
-    public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
-
-    @Override
-    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
-      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
-    }
-
-    @Override
-    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
-      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
-    }
-
-    @Override
-    public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
-      // No-Op. Do nothing.
-    }
-
-    @Override
-    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
-      // No-Op. Do nothing.
-    }
-
-    @Override
-    public ManagedBuffer getMergedBlockData(
-        String appId,
-        int shuffleId,
-        int shuffleMergeId,
-        int reduceId,
-        int chunkId) {
-      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
-    }
-
-    @Override
-    public MergedBlockMeta getMergedBlockMeta(
-        String appId,
-        int shuffleId,
-        int shuffleMergeId,
-        int reduceId) {
-      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
-    }
-
-    @Override
-    public String[] getMergedBlockDirs(String appId) {
-      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
-    }
-  }
-
   @Override
   public void channelActive(TransportClient client) {
     metrics.activeConnections.inc();
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
new file mode 100644
index 0000000..f47bfc3
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
+ * is not enabled.
+ *
+ * @since 3.1.0
+ */
+public class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
+
+  // This constructor is needed because we use this constructor to instantiate an implementation
+  // of MergedShuffleFileManager using reflection.
+  // See YarnShuffleService#newMergedShuffleFileManagerInstance.
+  public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
+
+  @Override
+  public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+    throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+  }
+
+  @Override
+  public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
+    throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+  }
+
+  @Override
+  public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
+    // No-Op. Do nothing.
+  }
+
+  @Override
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    // No-Op. Do nothing.
+  }
+
+  @Override
+  public ManagedBuffer getMergedBlockData(
+      String appId,
+      int shuffleId,
+      int shuffleMergeId,
+      int reduceId,
+      int chunkId) {
+    throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+  }
+
+  @Override
+  public MergedBlockMeta getMergedBlockMeta(
+      String appId,
+      int shuffleId,
+      int shuffleMergeId,
+      int reduceId) {
+    throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+  }
+
+  @Override
+  public String[] getMergedBlockDirs(String appId) {
+    throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+  }
+}
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index cb6d5d0..ac16369 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.api.*;
 import org.apache.spark.network.shuffle.MergedShuffleFileManager;
+import org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager;
 import org.apache.spark.network.util.LevelDBProvider;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
@@ -284,7 +285,7 @@ public class YarnShuffleService extends AuxiliaryService {
       return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
     } catch (Exception e) {
       logger.error("Unable to create an instance of {}", mergeManagerImplClassName);
-      return new ExternalBlockHandler.NoOpMergedShuffleFileManager(conf);
+      return new NoOpMergedShuffleFileManager(conf);
     }
   }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index fb40973..b2025aa 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -43,7 +43,7 @@ import org.scalatest.matchers.should.Matchers._
 import org.apache.spark.SecurityManager
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.config._
-import org.apache.spark.network.shuffle.{ExternalBlockHandler, RemoteBlockPushResolver, ShuffleTestAccessor}
+import org.apache.spark.network.shuffle.{NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.Utils
@@ -434,9 +434,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
   test("create default merged shuffle file manager instance") {
     val mockConf = mock(classOf[TransportConf])
     when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
-      "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager")
+      "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager")
     val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
-    assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager])
+    assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
   }
 
   test("create remote block push resolver instance") {
@@ -452,6 +452,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
     when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
       "org.apache.spark.network.shuffle.NotExistent")
     val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
-    assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager])
+    assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org