You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/02 03:24:04 UTC

[incubator-uniffle] branch master updated: [#872][FOLLOWUP] feat(tez): Add UmbilicalUtils to get Worker info from AM (#919)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6471c5e8 [#872][FOLLOWUP] feat(tez): Add UmbilicalUtils to get Worker info from AM (#919)
6471c5e8 is described below

commit 6471c5e8a9e11a8d9401d727a0e7d44b2287822e
Author: Qing <11...@qq.com>
AuthorDate: Fri Jun 2 11:23:58 2023 +0800

    [#872][FOLLOWUP] feat(tez): Add UmbilicalUtils to get Worker info from AM (#919)
    
    ### What changes were proposed in this pull request?
    Add UmbilicalUtils to get Worker info from AM
    
    ### Why are the changes needed?
    Fix: https://github.com/apache/incubator-uniffle/issues/872
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    unit test
---
 .../java/org/apache/tez/common/UmbilicalUtils.java | 112 +++++++++++++++++++++
 1 file changed, 112 insertions(+)

diff --git a/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
new file mode 100644
index 00000000..5b1a06ff
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+public class UmbilicalUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(UmbilicalUtils.class);
+
+  private UmbilicalUtils() {
+  }
+
+  /**
+   *
+   * @return Get Application Master host and port from config file
+   */
+  private static Pair<String, Integer> getAmHostPort() {
+    JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
+    String host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null host");
+    int port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
+    LOG.info("Got RssConf am info, host is: {}, port is: {}", host, port);
+    return new ImmutablePair<>(host, port);
+  }
+
+  /**
+   *
+   * @param applicationId Application Id of this task
+   * @param conf  Configuration
+   * @param taskAttemptId task Attempt Id
+   * @param shuffleId   computed using dagId, up dagName, down dagName by RssTezUtils.computeShuffleId() method.
+   * @return Shuffle Server Info by request Application Master
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TezException
+   */
+  private static Map<Integer, List<ShuffleServerInfo>> doRequestShuffleServer(
+      ApplicationId applicationId,
+      Configuration conf,
+      TezTaskAttemptID taskAttemptId,
+      int shuffleId) throws IOException, InterruptedException, TezException {
+    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(applicationId.toString());
+
+    Pair<String, Integer> amHostPort = getAmHostPort();
+    final InetSocketAddress address = NetUtils.createSocketAddrForHost(amHostPort.getLeft(), amHostPort.getRight());
+    TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
+        .doAs(new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
+          @Override
+          public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
+            return RPC.getProxy(TezRemoteShuffleUmbilicalProtocol.class,
+                TezRemoteShuffleUmbilicalProtocol.versionID,
+                address, conf);
+          }
+        });
+    GetShuffleServerRequest request = new GetShuffleServerRequest(taskAttemptId, 200, 200, shuffleId);
+
+    GetShuffleServerResponse response = umbilical.getShuffleAssignments(request);
+    Map<Integer, List<ShuffleServerInfo>>  partitionToServers = response.getShuffleAssignmentsInfoWritable()
+        .getShuffleAssignmentsInfo()
+        .getPartitionToServers();
+    LOG.info("RequestShuffleServer applicationId:{}, taskAttemptId:{}, host:{}, port:{}, shuffleId:{}, worker:{}",
+        applicationId, taskAttemptId, amHostPort.getLeft(), amHostPort.getRight(), shuffleId, partitionToServers);
+    return partitionToServers;
+  }
+
+  public static Map<Integer, List<ShuffleServerInfo>> requestShuffleServer(
+      ApplicationId applicationId,
+      Configuration conf,
+      TezTaskAttemptID taskAttemptId,
+      int shuffleId) {
+    try {
+      return doRequestShuffleServer(applicationId, conf, taskAttemptId,shuffleId);
+    } catch (IOException | InterruptedException | TezException e) {
+      LOG.error("Failed to requestShuffleServer, applicationId:{}, taskAttemptId:{}, shuffleId:{}, worker:{}",
+          applicationId, taskAttemptId, shuffleId, e);
+    }
+    return null;
+  }
+}