You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/16 04:27:12 UTC
[incubator-celeborn] branch branch-0.2 updated: [CELEBORN-192][FOLLOWUP] Remove unnecessary file and fix compile issues
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.2 by this push:
new 0589ad7f [CELEBORN-192][FOLLOWUP] Remove unnecessary file and fix compile issues
0589ad7f is described below
commit 0589ad7f52c3318eb1837a0056f0ee0a83a9d95a
Author: zky.zhoukeyong <zk...@alibaba-inc.com>
AuthorDate: Mon Jan 16 12:25:18 2023 +0800
[CELEBORN-192][FOLLOWUP] Remove unnecessary file and fix compile issues
---
.../apache/celeborn/client/LifecycleManager.scala | 4 +-
.../celeborn/client/commit/CommitHandler.scala | 486 ---------------------
2 files changed, 2 insertions(+), 488 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 5bee87dc..1f766fed 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1037,7 +1037,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
res.status match {
case StatusCode.SUCCESS => // do nothing
- case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.FAILED =>
+ case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED =>
logDebug(s"Request $commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
commitFilesFailedWorkers.put(worker, (res.status, System.currentTimeMillis()))
@@ -1623,7 +1623,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
}
CommitFilesResponse(
- StatusCode.FAILED,
+ StatusCode.REQUEST_FAILED,
List.empty.asJava,
List.empty.asJava,
message.masterIds,
diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
deleted file mode 100644
index e9ace39c..00000000
--- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.client.commit
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.concurrent.{Callable, ConcurrentHashMap, TimeUnit}
-import java.util.concurrent.atomic.{AtomicLong, LongAdder}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import com.google.common.cache.{Cache, CacheBuilder}
-
-import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
-import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers, ShuffleFileGroups}
-import org.apache.celeborn.client.ShuffleCommittedInfo
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{PartitionLocationInfo, WorkerInfo}
-import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionType}
-import org.apache.celeborn.common.protocol.message.ControlMessages.{CommitFiles, CommitFilesResponse, GetReducerFileGroupResponse}
-import org.apache.celeborn.common.protocol.message.StatusCode
-import org.apache.celeborn.common.rpc.{RpcCallContext, RpcEndpointRef}
-import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNettyRpcCallContext}
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
-// Can Remove this if celeborn don't support scala211 in future
-import org.apache.celeborn.common.util.FunctionConverter._
-
-case class CommitResult(
- masterPartitionLocationMap: ConcurrentHashMap[String, PartitionLocation],
- slavePartitionLocationMap: ConcurrentHashMap[String, PartitionLocation],
- commitFilesFailedWorkers: ShuffleFailedWorkers)
-
-abstract class CommitHandler(
- appId: String,
- conf: CelebornConf,
- allocatedWorkers: ShuffleAllocatedWorkers,
- committedPartitionInfo: CommittedPartitionInfo) extends Logging {
-
- private val pushReplicateEnabled = conf.pushReplicateEnabled
- private val testRetryCommitFiles = conf.testRetryCommitFiles
- private val rpcCacheSize = conf.rpcCacheSize
- private val rpcCacheConcurrencyLevel = conf.rpcCacheConcurrencyLevel
- private val rpcCacheExpireTime = conf.rpcCacheExpireTime
-
- private val commitEpoch = new AtomicLong()
- private val totalWritten = new LongAdder
- private val fileCount = new LongAdder
- private val reducerFileGroupsMap = new ShuffleFileGroups
- // noinspection UnstableApiUsage
- private val getReducerFileGroupRpcCache: Cache[Int, ByteBuffer] = CacheBuilder.newBuilder()
- .concurrencyLevel(rpcCacheConcurrencyLevel)
- .expireAfterWrite(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
- .maximumSize(rpcCacheSize)
- .build().asInstanceOf[Cache[Int, ByteBuffer]]
-
- def getPartitionType(): PartitionType
-
- def isStageEnd(shuffleId: Int): Boolean = false
-
- def isStageEndOrInProcess(shuffleId: Int): Boolean = false
-
- def isStageDataLost(shuffleId: Int): Boolean = false
-
- def setStageEnd(shuffleId: Int): Unit
-
- /**
- * return (waitStage isTimeOut, waitTime)
- */
- def waitStageEnd(shuffleId: Int): (Boolean, Long) = (true, 0)
-
- def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = false
-
- def batchUnHandledRequests(shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo)
- : Map[WorkerInfo, collection.Set[PartitionLocation]] = {
- // When running to here, if handleStageEnd got lock first and commitFiles,
- // then this batch get this lock, commitPartitionRequests may contains
- // partitions which are already committed by stageEnd process.
- // But inProcessStageEndShuffleSet should have contain this shuffle id,
- // can directly return empty.
- if (this.isStageEndOrInProcess(shuffleId)) {
- logWarning(s"Shuffle $shuffleId ended or during processing stage end.")
- shuffleCommittedInfo.unHandledPartitionLocations.clear()
- Map.empty[WorkerInfo, Set[PartitionLocation]]
- } else {
- val currentBatch = this.getUnHandledPartitionLocations(shuffleId, shuffleCommittedInfo)
- shuffleCommittedInfo.unHandledPartitionLocations.clear()
- currentBatch.foreach { partitionLocation =>
- shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation)
- if (partitionLocation.getPeer != null) {
- shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation.getPeer)
- }
- }
-
- if (currentBatch.nonEmpty) {
- logWarning(s"Commit current batch HARD_SPLIT partitions for $shuffleId: " +
- s"${currentBatch.map(_.getUniqueId).mkString("[", ",", "]")}")
- val workerToRequests = currentBatch.flatMap { partitionLocation =>
- if (partitionLocation.getPeer != null) {
- Seq(partitionLocation, partitionLocation.getPeer)
- } else {
- Seq(partitionLocation)
- }
- }.groupBy(_.getWorker)
- workerToRequests
- } else {
- Map.empty[WorkerInfo, Set[PartitionLocation]]
- }
- }
- }
-
- protected def getUnHandledPartitionLocations(
- shuffleId: Int,
- shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation]
-
- def incrementInFlightNum(
- shuffleCommittedInfo: ShuffleCommittedInfo,
- workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]]): Unit = {
- shuffleCommittedInfo.allInFlightCommitRequestNum.addAndGet(workerToRequests.size)
- }
-
- def decrementInFlightNum(
- shuffleCommittedInfo: ShuffleCommittedInfo,
- workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]]): Unit = {
- shuffleCommittedInfo.allInFlightCommitRequestNum.addAndGet(-workerToRequests.size)
- }
-
- /**
- * when someone calls tryFinalCommit, the function will return true if there is no one ever do final commit before,
- * otherwise it will return false.
- *
- * @return
- */
- def tryFinalCommit(
- shuffleId: Int,
- recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean
-
- def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit = {
- if (isStageDataLost(shuffleId)) {
- context.reply(
- GetReducerFileGroupResponse(
- StatusCode.SHUFFLE_DATA_LOST,
- new ConcurrentHashMap(),
- Array.empty))
- } else {
- if (context.isInstanceOf[LocalNettyRpcCallContext]) {
- // This branch is for the UTs
- context.reply(GetReducerFileGroupResponse(
- StatusCode.SUCCESS,
- reducerFileGroupsMap.getOrDefault(shuffleId, new ConcurrentHashMap()),
- getMapperAttempts(shuffleId)))
- } else {
- val cachedMsg = getReducerFileGroupRpcCache.get(
- shuffleId,
- new Callable[ByteBuffer]() {
- override def call(): ByteBuffer = {
- val returnedMsg = GetReducerFileGroupResponse(
- StatusCode.SUCCESS,
- reducerFileGroupsMap.getOrDefault(shuffleId, new ConcurrentHashMap()),
- getMapperAttempts(shuffleId))
- context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
- }
- })
- context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
- }
- }
- }
-
- def removeExpiredShuffle(shuffleId: Int): Unit = {
- reducerFileGroupsMap.remove(shuffleId)
- }
-
- /**
- * For reduce partition if shuffle registered and corresponding map finished, reply true.
- * For map partition would always return false, as one mapper attempt finished don't mean mapper ended.
- */
- def isMapperEnded(shuffleId: Int, mapId: Int): Boolean = false
-
- def getMapperAttempts(shuffleId: Int): Array[Int]
-
- /**
- * return (thisMapperAttemptedFinishedSuccessOrNot, allMapperFinishedOrNot)
- */
- def finishMapperAttempt(
- shuffleId: Int,
- mapId: Int,
- attemptId: Int,
- numMappers: Int,
- partitionId: Int,
- recordWorkerFailure: ShuffleFailedWorkers => Unit): (Boolean, Boolean)
-
- def registerShuffle(shuffleId: Int, numMappers: Int): Unit = {
- reducerFileGroupsMap.put(shuffleId, new ConcurrentHashMap())
- }
-
- def parallelCommitFiles(
- shuffleId: Int,
- allocatedWorkers: util.Map[WorkerInfo, PartitionLocationInfo],
- partitionIdOpt: Option[Int] = None): CommitResult = {
- val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
- val masterPartMap = new ConcurrentHashMap[String, PartitionLocation]
- val slavePartMap = new ConcurrentHashMap[String, PartitionLocation]
- val commitFilesFailedWorkers = new ShuffleFailedWorkers()
- val commitFileStartTime = System.nanoTime()
- val parallelism = Math.min(allocatedWorkers.size(), conf.rpcMaxParallelism)
- ThreadUtils.parmap(
- allocatedWorkers.asScala.to,
- "CommitFiles",
- parallelism) { case (worker, partitionLocationInfo) =>
- if (partitionLocationInfo.containsShuffle(shuffleId.toString)) {
- val masterParts =
- partitionLocationInfo.getMasterLocations(shuffleId.toString, partitionIdOpt)
- val slaveParts = partitionLocationInfo.getSlaveLocations(shuffleId.toString, partitionIdOpt)
- masterParts.asScala.foreach { p =>
- val partition = new PartitionLocation(p)
- partition.setFetchPort(worker.fetchPort)
- partition.setPeer(null)
- masterPartMap.put(partition.getUniqueId, partition)
- }
- slaveParts.asScala.foreach { p =>
- val partition = new PartitionLocation(p)
- partition.setFetchPort(worker.fetchPort)
- partition.setPeer(null)
- slavePartMap.put(partition.getUniqueId, partition)
- }
-
- val (masterIds, slaveIds) = shuffleCommittedInfo.synchronized {
- (
- masterParts.asScala
- .filterNot(shuffleCommittedInfo.handledPartitionLocations.contains)
- .map(_.getUniqueId).asJava,
- slaveParts.asScala
- .filterNot(shuffleCommittedInfo.handledPartitionLocations.contains)
- .map(_.getUniqueId).asJava)
- }
-
- commitFiles(
- appId,
- shuffleId,
- shuffleCommittedInfo,
- worker,
- masterIds,
- slaveIds,
- commitFilesFailedWorkers)
- }
- }
-
- logInfo(s"Shuffle $shuffleId " +
- s"commit files complete. File count ${shuffleCommittedInfo.currentShuffleFileCount.sum()} " +
- s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms")
-
- CommitResult(masterPartMap, slavePartMap, commitFilesFailedWorkers)
- }
-
- def commitFiles(
- applicationId: String,
- shuffleId: Int,
- shuffleCommittedInfo: ShuffleCommittedInfo,
- worker: WorkerInfo,
- masterIds: util.List[String],
- slaveIds: util.List[String],
- commitFilesFailedWorkers: ShuffleFailedWorkers): Unit = {
-
- val res =
- if (!testRetryCommitFiles) {
- val commitFiles = CommitFiles(
- applicationId,
- shuffleId,
- masterIds,
- slaveIds,
- getMapperAttempts(shuffleId),
- commitEpoch.incrementAndGet())
- val res = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
-
- res.status match {
- case StatusCode.SUCCESS => // do nothing
- case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED =>
- logDebug(s"Request $commitFiles return ${res.status} for " +
- s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
- commitFilesFailedWorkers.put(worker, (res.status, System.currentTimeMillis()))
- case _ => // won't happen
- }
- res
- } else {
- // for test
- val commitFiles1 = CommitFiles(
- applicationId,
- shuffleId,
- masterIds.subList(0, masterIds.size() / 2),
- slaveIds.subList(0, slaveIds.size() / 2),
- getMapperAttempts(shuffleId),
- commitEpoch.incrementAndGet())
- val res1 = requestCommitFilesWithRetry(worker.endpoint, commitFiles1)
-
- val commitFiles = CommitFiles(
- applicationId,
- shuffleId,
- masterIds.subList(masterIds.size() / 2, masterIds.size()),
- slaveIds.subList(slaveIds.size() / 2, slaveIds.size()),
- getMapperAttempts(shuffleId),
- commitEpoch.incrementAndGet())
- val res2 = requestCommitFilesWithRetry(worker.endpoint, commitFiles)
-
- res1.committedMasterStorageInfos.putAll(res2.committedMasterStorageInfos)
- res1.committedSlaveStorageInfos.putAll(res2.committedSlaveStorageInfos)
- res1.committedMapIdBitMap.putAll(res2.committedMapIdBitMap)
- CommitFilesResponse(
- status = if (res1.status == StatusCode.SUCCESS) res2.status else res1.status,
- (res1.committedMasterIds.asScala ++ res2.committedMasterIds.asScala).toList.asJava,
- (res1.committedSlaveIds.asScala ++ res1.committedSlaveIds.asScala).toList.asJava,
- (res1.failedMasterIds.asScala ++ res1.failedMasterIds.asScala).toList.asJava,
- (res1.failedSlaveIds.asScala ++ res2.failedSlaveIds.asScala).toList.asJava,
- res1.committedMasterStorageInfos,
- res1.committedSlaveStorageInfos,
- res1.committedMapIdBitMap,
- res1.totalWritten + res2.totalWritten,
- res1.fileCount + res2.fileCount)
- }
-
- shuffleCommittedInfo.synchronized {
- // record committed partitionIds
- res.committedMasterIds.asScala.foreach({
- case commitMasterId =>
- val partitionUniqueIdList = shuffleCommittedInfo.committedMasterIds.computeIfAbsent(
- Utils.splitPartitionLocationUniqueId(commitMasterId)._1,
- (k: Int) => new util.ArrayList[String]())
- partitionUniqueIdList.add(commitMasterId)
- })
-
- res.committedSlaveIds.asScala.foreach({
- case commitSlaveId =>
- val partitionUniqueIdList = shuffleCommittedInfo.committedSlaveIds.computeIfAbsent(
- Utils.splitPartitionLocationUniqueId(commitSlaveId)._1,
- (k: Int) => new util.ArrayList[String]())
- partitionUniqueIdList.add(commitSlaveId)
- })
-
- // record committed partitions storage hint and disk hint
- shuffleCommittedInfo.committedMasterStorageInfos.putAll(res.committedMasterStorageInfos)
- shuffleCommittedInfo.committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
-
- // record failed partitions
- shuffleCommittedInfo.failedMasterPartitionIds.putAll(
- res.failedMasterIds.asScala.map((_, worker)).toMap.asJava)
- shuffleCommittedInfo.failedSlavePartitionIds.putAll(
- res.failedSlaveIds.asScala.map((_, worker)).toMap.asJava)
-
- shuffleCommittedInfo.committedMapIdBitmap.putAll(res.committedMapIdBitMap)
-
- totalWritten.add(res.totalWritten)
- fileCount.add(res.fileCount)
- shuffleCommittedInfo.currentShuffleFileCount.add(res.fileCount)
- }
- }
-
- def collectResult(
- shuffleId: Int,
- shuffleCommittedInfo: ShuffleCommittedInfo,
- masterPartitionUniqueIds: util.Iterator[String],
- slavePartitionUniqueIds: util.Iterator[String],
- masterPartMap: ConcurrentHashMap[String, PartitionLocation],
- slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = {
- val committedPartitions = new util.HashMap[String, PartitionLocation]
- masterPartitionUniqueIds.asScala.foreach { id =>
- masterPartMap.get(id).setStorageInfo(
- shuffleCommittedInfo.committedMasterStorageInfos.get(id))
- masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
- committedPartitions.put(id, masterPartMap.get(id))
- }
-
- slavePartitionUniqueIds.asScala.foreach { id =>
- val slavePartition = slavePartMap.get(id)
- slavePartition.setStorageInfo(shuffleCommittedInfo.committedSlaveStorageInfos.get(id))
- val masterPartition = committedPartitions.get(id)
- if (masterPartition ne null) {
- masterPartition.setPeer(slavePartition)
- slavePartition.setPeer(masterPartition)
- } else {
- logInfo(s"Shuffle $shuffleId partition $id: master lost, " +
- s"use slave $slavePartition.")
- slavePartition.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
- committedPartitions.put(id, slavePartition)
- }
- }
-
- committedPartitions.values().asScala.foreach { partition =>
- val partitionLocations = reducerFileGroupsMap.get(shuffleId).computeIfAbsent(
- partition.getId,
- (k: Integer) => new util.HashSet[PartitionLocation]())
- partitionLocations.add(partition)
- }
- }
-
- private def requestCommitFilesWithRetry(
- endpoint: RpcEndpointRef,
- message: CommitFiles): CommitFilesResponse = {
- val maxRetries = conf.requestCommitFilesMaxRetries
- var retryTimes = 0
- while (retryTimes < maxRetries) {
- try {
- if (testRetryCommitFiles && retryTimes < maxRetries - 1) {
- endpoint.ask[CommitFilesResponse](message)
- Thread.sleep(1000)
- throw new Exception("Mock fail for CommitFiles")
- } else {
- return endpoint.askSync[CommitFilesResponse](message)
- }
- } catch {
- case e: Throwable =>
- retryTimes += 1
- logError(
- s"AskSync CommitFiles for ${message.shuffleId} failed (attempt $retryTimes/$maxRetries).",
- e)
- }
- }
-
- CommitFilesResponse(
- StatusCode.REQUEST_FAILED,
- List.empty.asJava,
- List.empty.asJava,
- message.masterIds,
- message.slaveIds)
- }
-
- def checkDataLost(
- shuffleId: Int,
- masterPartitionUniqueIdMap: util.Map[String, WorkerInfo],
- slavePartitionUniqueIdMap: util.Map[String, WorkerInfo]): Boolean = {
- val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
- if (!pushReplicateEnabled && masterPartitionUniqueIdMap.size() != 0) {
- val msg =
- masterPartitionUniqueIdMap.asScala.map {
- case (partitionUniqueId, workerInfo) =>
- s"Lost partition $partitionUniqueId in worker [${workerInfo.readableAddress()}]"
- }.mkString("\n")
- logError(
- s"""
- |For shuffle $shuffleKey partition data lost:
- |$msg
- |""".stripMargin)
- true
- } else {
- val failedBothPartitionIdsToWorker = masterPartitionUniqueIdMap.asScala.flatMap {
- case (partitionUniqueId, worker) =>
- if (slavePartitionUniqueIdMap.asScala.contains(partitionUniqueId)) {
- Some(partitionUniqueId -> (worker, slavePartitionUniqueIdMap.get(partitionUniqueId)))
- } else {
- None
- }
- }
- if (failedBothPartitionIdsToWorker.nonEmpty) {
- val msg = failedBothPartitionIdsToWorker.map {
- case (partitionUniqueId, (masterWorker, slaveWorker)) =>
- s"Lost partition $partitionUniqueId " +
- s"in master worker [${masterWorker.readableAddress()}] and slave worker [$slaveWorker]"
- }.mkString("\n")
- logError(
- s"""
- |For shuffle $shuffleKey partition data lost:
- |$msg
- |""".stripMargin)
- true
- } else {
- false
- }
- }
- }
-
- def commitMetrics(): (Long, Long) = (totalWritten.sumThenReset(), fileCount.sumThenReset())
-}