You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2021/08/10 02:20:41 UTC
[spark] branch branch-3.2 updated: [SPARK-36460][SHUFFLE] Pull out
NoOpMergedShuffleFileManager inner class outside
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1a432fe [SPARK-36460][SHUFFLE] Pull out NoOpMergedShuffleFileManager inner class outside
1a432fe is described below
commit 1a432fe6bb7e221a56852f671685910158a22dd2
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Tue Aug 10 10:19:24 2021 +0800
[SPARK-36460][SHUFFLE] Pull out NoOpMergedShuffleFileManager inner class outside
### What changes were proposed in this pull request?
Pull out NoOpMergedShuffleFileManager inner class outside. This is required since passing dollar sign ($) for the config (`spark.shuffle.server.mergedShuffleFileManagerImpl`) value can be an issue. Currently `spark.shuffle.server.mergedShuffleFileManagerImpl` is by default set to `org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager`. After this change the default value be set to `org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager`
### Why are the changes needed?
Passing `$` for the config value can be an issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified existing unit tests.
Closes #33688 from venkata91/SPARK-36460.
Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
(cherry picked from commit df0de83c4679a5c8a9c0e3c7995bc7d692e121f0)
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../apache/spark/network/util/TransportConf.java | 4 +-
.../network/shuffle/ExternalBlockHandler.java | 58 ---------------
.../shuffle/NoOpMergedShuffleFileManager.java | 86 ++++++++++++++++++++++
.../spark/network/yarn/YarnShuffleService.java | 3 +-
.../network/yarn/YarnShuffleServiceSuite.scala | 8 +-
5 files changed, 94 insertions(+), 65 deletions(-)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 8e7ecf5..69b8b25 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -378,13 +378,13 @@ public class TransportConf {
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
* a cluster level because this configuration is set to
- * 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
+ * 'org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager'.
* To turn on push-based shuffle at a cluster level, set the configuration to
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
*/
public String mergedShuffleFileManagerImpl() {
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
- "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
+ "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager");
}
/**
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 71741f2..1e413f6 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -594,64 +594,6 @@ public class ExternalBlockHandler extends RpcHandler
}
}
- /**
- * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
- * is not enabled.
- *
- * @since 3.1.0
- */
- public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
-
- // This constructor is needed because we use this constructor to instantiate an implementation
- // of MergedShuffleFileManager using reflection.
- // See YarnShuffleService#newMergedShuffleFileManagerInstance.
- public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
-
- @Override
- public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
- throw new UnsupportedOperationException("Cannot handle shuffle block merge");
- }
-
- @Override
- public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
- throw new UnsupportedOperationException("Cannot handle shuffle block merge");
- }
-
- @Override
- public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
- // No-Op. Do nothing.
- }
-
- @Override
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- // No-Op. Do nothing.
- }
-
- @Override
- public ManagedBuffer getMergedBlockData(
- String appId,
- int shuffleId,
- int shuffleMergeId,
- int reduceId,
- int chunkId) {
- throw new UnsupportedOperationException("Cannot handle shuffle block merge");
- }
-
- @Override
- public MergedBlockMeta getMergedBlockMeta(
- String appId,
- int shuffleId,
- int shuffleMergeId,
- int reduceId) {
- throw new UnsupportedOperationException("Cannot handle shuffle block merge");
- }
-
- @Override
- public String[] getMergedBlockDirs(String appId) {
- throw new UnsupportedOperationException("Cannot handle shuffle block merge");
- }
- }
-
@Override
public void channelActive(TransportClient client) {
metrics.activeConnections.inc();
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
new file mode 100644
index 0000000..f47bfc3
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
+ * is not enabled.
+ *
+ * @since 3.1.0
+ */
+public class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
+
+ // This constructor is needed because we use this constructor to instantiate an implementation
+ // of MergedShuffleFileManager using reflection.
+ // See YarnShuffleService#newMergedShuffleFileManagerInstance.
+ public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
+
+ @Override
+ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+ throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+ }
+
+ @Override
+ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
+ throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+ }
+
+ @Override
+ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
+ // No-Op. Do nothing.
+ }
+
+ @Override
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ // No-Op. Do nothing.
+ }
+
+ @Override
+ public ManagedBuffer getMergedBlockData(
+ String appId,
+ int shuffleId,
+ int shuffleMergeId,
+ int reduceId,
+ int chunkId) {
+ throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+ }
+
+ @Override
+ public MergedBlockMeta getMergedBlockMeta(
+ String appId,
+ int shuffleId,
+ int shuffleMergeId,
+ int reduceId) {
+ throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+ }
+
+ @Override
+ public String[] getMergedBlockDirs(String appId) {
+ throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+ }
+}
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index cb6d5d0..ac16369 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
+import org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager;
import org.apache.spark.network.util.LevelDBProvider;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
@@ -284,7 +285,7 @@ public class YarnShuffleService extends AuxiliaryService {
return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
} catch (Exception e) {
logger.error("Unable to create an instance of {}", mergeManagerImplClassName);
- return new ExternalBlockHandler.NoOpMergedShuffleFileManager(conf);
+ return new NoOpMergedShuffleFileManager(conf);
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index fb40973..b2025aa 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -43,7 +43,7 @@ import org.scalatest.matchers.should.Matchers._
import org.apache.spark.SecurityManager
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config._
-import org.apache.spark.network.shuffle.{ExternalBlockHandler, RemoteBlockPushResolver, ShuffleTestAccessor}
+import org.apache.spark.network.shuffle.{NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.Utils
@@ -434,9 +434,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
test("create default merged shuffle file manager instance") {
val mockConf = mock(classOf[TransportConf])
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
- "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager")
+ "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager")
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
- assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager])
+ assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
}
test("create remote block push resolver instance") {
@@ -452,6 +452,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
"org.apache.spark.network.shuffle.NotExistent")
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
- assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager])
+ assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org