You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/02 03:24:04 UTC
[incubator-uniffle] branch master updated: [#872][FOLLOWUP] feat(tez): Add UmbilicalUtils to get Worker info from AM (#919)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6471c5e8 [#872][FOLLOWUP] feat(tez): Add UmbilicalUtils to get Worker info from AM (#919)
6471c5e8 is described below
commit 6471c5e8a9e11a8d9401d727a0e7d44b2287822e
Author: Qing <11...@qq.com>
AuthorDate: Fri Jun 2 11:23:58 2023 +0800
[#872][FOLLOWUP] feat(tez): Add UmbilicalUtils to get Worker info from AM (#919)
### What changes were proposed in this pull request?
Add UmbilicalUtils to get Worker info from AM
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/872
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../java/org/apache/tez/common/UmbilicalUtils.java | 112 +++++++++++++++++++++
1 file changed, 112 insertions(+)
diff --git a/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
new file mode 100644
index 00000000..5b1a06ff
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+public class UmbilicalUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(UmbilicalUtils.class);
+
+ private UmbilicalUtils() {
+ }
+
+ /**
+ *
+ * @return Get Application Master host and port from config file
+ */
+ private static Pair<String, Integer> getAmHostPort() {
+ JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
+ String host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null host");
+ int port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
+ LOG.info("Got RssConf am info, host is: {}, port is: {}", host, port);
+ return new ImmutablePair<>(host, port);
+ }
+
+ /**
+ *
+ * @param applicationId Application Id of this task
+ * @param conf Configuration
+ * @param taskAttemptId task Attempt Id
+ * @param shuffleId computed using dagId, up dagName, down dagName by RssTezUtils.computeShuffleId() method.
+ * @return Shuffle Server Info by request Application Master
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TezException
+ */
+ private static Map<Integer, List<ShuffleServerInfo>> doRequestShuffleServer(
+ ApplicationId applicationId,
+ Configuration conf,
+ TezTaskAttemptID taskAttemptId,
+ int shuffleId) throws IOException, InterruptedException, TezException {
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(applicationId.toString());
+
+ Pair<String, Integer> amHostPort = getAmHostPort();
+ final InetSocketAddress address = NetUtils.createSocketAddrForHost(amHostPort.getLeft(), amHostPort.getRight());
+ TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
+ .doAs(new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
+ @Override
+ public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
+ return RPC.getProxy(TezRemoteShuffleUmbilicalProtocol.class,
+ TezRemoteShuffleUmbilicalProtocol.versionID,
+ address, conf);
+ }
+ });
+ GetShuffleServerRequest request = new GetShuffleServerRequest(taskAttemptId, 200, 200, shuffleId);
+
+ GetShuffleServerResponse response = umbilical.getShuffleAssignments(request);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers = response.getShuffleAssignmentsInfoWritable()
+ .getShuffleAssignmentsInfo()
+ .getPartitionToServers();
+ LOG.info("RequestShuffleServer applicationId:{}, taskAttemptId:{}, host:{}, port:{}, shuffleId:{}, worker:{}",
+ applicationId, taskAttemptId, amHostPort.getLeft(), amHostPort.getRight(), shuffleId, partitionToServers);
+ return partitionToServers;
+ }
+
+ public static Map<Integer, List<ShuffleServerInfo>> requestShuffleServer(
+ ApplicationId applicationId,
+ Configuration conf,
+ TezTaskAttemptID taskAttemptId,
+ int shuffleId) {
+ try {
+ return doRequestShuffleServer(applicationId, conf, taskAttemptId,shuffleId);
+ } catch (IOException | InterruptedException | TezException e) {
+ LOG.error("Failed to requestShuffleServer, applicationId:{}, taskAttemptId:{}, shuffleId:{}, worker:{}",
+ applicationId, taskAttemptId, shuffleId, e);
+ }
+ return null;
+ }
+}