You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "jerqi (via GitHub)" <gi...@apache.org> on 2023/03/22 15:26:12 UTC

[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #693: feat: support stage recompute for spark clients

jerqi commented on code in PR #693:
URL: https://github.com/apache/incubator-uniffle/pull/693#discussion_r1144969670


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java:
##########
@@ -265,6 +266,17 @@ public class RssSparkConfig {
                    + " spark.rss.estimate.server.assignment.enabled"))
       .createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
 
+  public static final ConfigEntry<Integer> RSS_SHUFFLE_MANAGER_GRPC_PORT = createIntegerBuilder(

Review Comment:
   We need to add document for this feature.



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import com.google.common.collect.Maps;
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleManagerGrpc;
+
+public class ShuffleManagerGrpcService extends ShuffleManagerGrpc.ShuffleManagerImplBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
+  // todo: unregister shuffle status when shuffle is gced.
+  private final Map<Integer, RssShuffleStatus> shuffleStatus = Maps.newConcurrentMap();

Review Comment:
   When do we clean this collection?



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {

Review Comment:
   Could we have a better name? Is it a iterator actually?



##########
client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java:
##########
@@ -86,4 +86,6 @@ public class RssClientConfig {
   public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = "rss.estimate.task.concurrency.per.server";
   public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = 80;
 
+  public static final String RSS_STAGE_RECOMPUTE = "rss.stage.reCompute";

Review Comment:
   `reCompute` seems weird.



##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java:
##########
@@ -173,6 +175,20 @@ public Void apply(TaskContext context) {
     if (!(resultIter instanceof InterruptibleIterator)) {
       resultIter = new InterruptibleIterator<>(context, resultIter);
     }
+    // stage re-compute and shuffle manager server port are both set
+    if (rssConf.getBoolean(RssClientConfig.RSS_STAGE_RECOMPUTE, false)

Review Comment:
   Is it right? Should we use Spark's config option?



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssFeatureMatrix.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+public class RssFeatureMatrix {

Review Comment:
   Could we move this method to `RssSparkShuffelUtils`?



##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/AbstractRssShuffleManagerBase.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.shuffle.manager;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+import org.apache.spark.MapOutputTracker;
+import org.apache.spark.MapOutputTrackerMaster;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.SparkException;
+import org.apache.spark.shuffle.RssFeatureMatrix;
+import org.apache.spark.shuffle.SparkVersionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractRssShuffleManagerBase implements RssShuffleManagerBase {

Review Comment:
   Why do we choose to add interface?



##########
proto/src/main/proto/Rss.proto:
##########
@@ -488,4 +488,26 @@ message CancelDecommissionRequest {
 message CancelDecommissionResponse {
   StatusCode status = 1;
   string retMsg = 2;
-}
\ No newline at end of file
+}
+// ShuffleManager service lives inside of compute-engine's application master, which handles rss shuffle specific logic
+// per application.
+service ShuffleManager {
+  rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns (ReportShuffleFetchFailureResponse);
+}
+
+message ReportShuffleFetchFailureRequest {
+  // appId normally should be omitted, it's used to avoid wrongly request issued from remaining executors of another
+  // app which accidentally has the same shuffle manager port with this app.
+  string appId = 1;
+  int32 shuffleId = 2;
+  int32 stageAttemptId = 3;
+  int32 partitionId = 4;
+  string exception = 5;
+  string serverId = 6;

Review Comment:
   What do the serverId mean?



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/ShuffleFetchFailedWrapper.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.reader;
+
+import java.io.IOException;
+
+import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Product2;
+import scala.collection.AbstractIterator;
+import scala.collection.Iterator;
+
+import org.apache.uniffle.client.api.ShuffleManagerClient;
+import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
+
+public class ShuffleFetchFailedWrapper {
+  private static final Logger LOG = LoggerFactory.getLogger(ShuffleFetchFailedWrapper.class);
+  private RssConf conf;
+  private String appId;
+  private int shuffleId;
+  private int partitionId;
+  private int stageAttemptId;
+  private String reportServerHost;
+  private int reportServerPort;
+
+  private ShuffleFetchFailedWrapper() {
+
+  }
+
+  static ShuffleFetchFailedWrapper newWrapper(RssConf conf) {
+    ShuffleFetchFailedWrapper wrapper = new ShuffleFetchFailedWrapper();
+    wrapper.conf = conf;
+    return wrapper;
+  }
+
+  ShuffleFetchFailedWrapper appId(String appId) {
+    this.appId = appId;
+    return this;
+  }
+
+  ShuffleFetchFailedWrapper shuffleId(int shuffleId) {
+    this.shuffleId = shuffleId;
+    return this;
+  }
+
+  ShuffleFetchFailedWrapper partitionId(int partitionId) {
+    this.partitionId = partitionId;
+    return this;
+  }
+
+  ShuffleFetchFailedWrapper stageAttemptId(int stageAttemptId) {
+    this.stageAttemptId = stageAttemptId;
+    return this;
+  }
+
+  ShuffleFetchFailedWrapper reportServerHost(String host) {
+    this.reportServerHost = host;
+    return this;
+  }
+
+  ShuffleFetchFailedWrapper port(int port) {
+    this.reportServerPort = port;
+    return this;
+  }
+
+
+  public <K, C> Iterator<Product2<K, C>> wrap(Iterator<Product2<K, C>> iter) {
+    return new IteratorImpl<>(iter, this);
+  }
+
+  private static class IteratorImpl<K, C> extends AbstractIterator<Product2<K, C>> {
+    private Iterator<Product2<K, C>> iter;
+    private ShuffleFetchFailedWrapper wrapper;
+
+    IteratorImpl(Iterator<Product2<K, C>> iter, ShuffleFetchFailedWrapper wrapper) {
+      this.iter = iter;
+      this.wrapper = wrapper;
+    }
+
+    private RuntimeException generateFetchFailedIfNecessary(RssFetchFailedException e) {
+      String driver = wrapper.reportServerHost;
+      int port = wrapper.reportServerPort;
+      // todo: reuse this manager client if this is a bottleneck.
+      try (ShuffleManagerClient client = RssSparkShuffleUtils.createShuffleManagerClient(driver, port)) {

Review Comment:
   Do we need to process InterruptException?



##########
internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+
+public class ShuffleManagerClientFactory {
+
+  private static final ShuffleManagerClientFactory INSTANCE = new ShuffleManagerClientFactory(ClientType.GRPC);

Review Comment:
   For Singleton, our system prefer LazyHolder's style like  class `ShuffleServerClientFactory`



##########
internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleManagerClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.factory;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
+import org.apache.uniffle.common.ClientType;
+
+public class ShuffleManagerClientFactory {
+
+  private static final ShuffleManagerClientFactory INSTANCE = new ShuffleManagerClientFactory(ClientType.GRPC);
+
+  public static ShuffleManagerClientFactory getINSTANCE() {
+    return INSTANCE;
+  }
+
+  private ClientType clientType;
+
+  public ShuffleManagerClientFactory(ClientType clientType) {
+    this.clientType = clientType;
+  }
+
+  public ShuffleManagerGrpcClient createShuffleManagerClient(String host, int port) {
+    if (clientType.equals(ClientType.GRPC)) {

Review Comment:
   Will it have problems when we use `ClientType.GRPC_NETTY`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org