You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/01 20:17:31 UTC

[GitHub] [arrow-ballista] thinkharderdev commented on a diff in pull request #261: Task level retry and Stage level retry

thinkharderdev commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r985138176


##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -110,6 +114,20 @@ pub struct ExecutionGraph {
     output_partitions: usize,
     /// Locations of this `ExecutionGraph` final output locations
     output_locations: Vec<PartitionLocation>,
+    /// Task ID generator, generate unique TID in the execution graph
+    task_id_gen: usize,
+    /// Failed stage attempts, record the failed stage attempts to limit the retry times.
+    /// Map from Stage ID -> Set<Stage_ATTPMPT_NUM>
+    failed_stage_attempts: HashMap<usize, HashSet<usize>>,

Review Comment:
   I don't get why we are saving a `HashSet` of attempts. Shouldn't it just be `usize`?



##########
ballista/rust/core/src/error.rs:
##########
@@ -182,13 +200,78 @@ impl Display for BallistaError {
             // }
             BallistaError::TonicError(desc) => write!(f, "Tonic error: {}", desc),
             BallistaError::GrpcError(desc) => write!(f, "Grpc error: {}", desc),
+            BallistaError::GrpcConnectionError(desc) => {
+                write!(f, "Grpc connection error: {}", desc)
+            }
             BallistaError::Internal(desc) => {
                 write!(f, "Internal Ballista error: {}", desc)
             }
             BallistaError::TokioError(desc) => write!(f, "Tokio join error: {}", desc),
+            BallistaError::GrpcActionError(desc) => {
+                write!(f, "Grpc Execute Action error: {}", desc)
+            }
+            BallistaError::FetchFailed(executor_id, map_stage, map_partition, desc) => {
+                write!(
+                    f,
+                    "Shuffle fetch partition error from Executor {}, map_stage {}, \
+                map_partition {}, error desc: {}",
+                    executor_id, map_stage, map_partition, desc
+                )
+            }
             BallistaError::Cancelled => write!(f, "Task cancelled"),
         }
     }
 }
 
+impl From<BallistaError> for FailedTask {
+    fn from(e: BallistaError) -> Self {
+        match e {
+            BallistaError::FetchFailed(
+                executor_id,
+                map_stage_id,
+                map_partition_id,
+                desc,
+            ) => {
+                FailedTask {
+                    error: desc,
+                    // fetch partition error is considered to be non-retryable
+                    retryable: false,
+                    count_to_failures: false,
+                    failed_reason: Some(FailedReason::FetchPartitionError(
+                        FetchPartitionError {
+                            executor_id,
+                            map_stage_id: map_stage_id as u32,
+                            map_partition_id: map_partition_id as u32,
+                        },
+                    )),
+                }
+            }
+            BallistaError::IoError(io) => {
+                FailedTask {
+                    error: format!("Task failed due to Ballista IO error: {:?}", io),
+                    // IO error is considered to be temporary and retryable
+                    retryable: true,
+                    count_to_failures: true,
+                    failed_reason: Some(FailedReason::IoError(IoError {})),
+                }
+            }
+            BallistaError::DataFusionError(DataFusionError::IoError(io)) => {
+                FailedTask {
+                    error: format!("Task failed due to DataFusion IO error: {:?}", io),
+                    // IO error is considered to be temporary and retryable
+                    retryable: true,
+                    count_to_failures: true,

Review Comment:
   I'm not sure I understand `retyrable` and `count_to_failures`. If a task is retry-able wouldn't wouldn't it count to failures in all case. Conversely if it's not retry-able then we don't need to count the failure at all. 



##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -250,48 +261,72 @@ fn send_fetch_partitions(
     AbortableReceiverStream::create(response_receiver, join_handles)
 }
 
-#[cfg(not(test))]
-async fn fetch_partition(
-    location: &PartitionLocation,
-) -> Result<SendableRecordBatchStream> {
-    let metadata = &location.executor_meta;
-    let partition_id = &location.partition_id;
-    // TODO for shuffle client connections, we should avoid creating new connections again and again.
-    // And we should also avoid to keep alive too many connections for long time.
-    let host = metadata.host.as_str();
-    let port = metadata.port as u16;
-    let mut ballista_client = BallistaClient::try_new(host, port)
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-    ballista_client
-        .fetch_partition(
-            &partition_id.job_id,
-            partition_id.stage_id as usize,
-            partition_id.partition_id as usize,
-            &location.path,
-            host,
-            port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))
+/// Partition reader Trait, different partition reader can have
+#[async_trait]
+trait PartitionReader: Send + Sync + Clone {
+    // Read partition data from PartitionLocation
+    async fn fetch_partition(
+        &self,
+        location: &PartitionLocation,
+    ) -> result::Result<SendableRecordBatchStream, BallistaError>;
 }
 
-#[cfg(test)]
-async fn fetch_partition(
-    location: &PartitionLocation,
-) -> Result<SendableRecordBatchStream> {
-    tests::fetch_test_partition(location)
+#[derive(Clone)]
+struct FlightPartitionReader {}
+
+#[async_trait]
+impl PartitionReader for FlightPartitionReader {
+    async fn fetch_partition(
+        &self,
+        location: &PartitionLocation,
+    ) -> result::Result<SendableRecordBatchStream, BallistaError> {
+        let metadata = &location.executor_meta;
+        let partition_id = &location.partition_id;
+        // TODO for shuffle client connections, we should avoid creating new connections again and again.
+        // And we should also avoid to keep alive too many connections for long time.
+        let host = metadata.host.as_str();
+        let port = metadata.port as u16;
+        let mut ballista_client =
+            BallistaClient::try_new(host, port)
+                .await
+                .map_err(|error| match error {
+                    // map grpc connection error to partition fetch error.
+                    BallistaError::GrpcConnectionError(msg) => {
+                        BallistaError::FetchFailed(
+                            metadata.id.clone(),
+                            partition_id.stage_id,
+                            partition_id.partition_id,
+                            msg,
+                        )
+                    }
+                    other => other,
+                })?;
+
+        ballista_client
+            .fetch_partition(&metadata.id, partition_id, &location.path, host, port)
+            .await
+    }
 }
 
+#[allow(dead_code)]
+// TODO
+struct LocalPartitionReader {}

Review Comment:
   Nice! I assume where this is going is making the executors smart enough to read partitions on the same physical machine directly from disk.



##########
ballista/rust/core/proto/ballista.proto:
##########
@@ -664,15 +698,48 @@ message RunningTask {
 
 message FailedTask {
   string error = 1;
+  bool retryable = 2;
+  // Whether this task failure should be counted to the maximum number of times the task is allowed to retry
+  bool count_to_failures = 3;
+  oneof failed_reason {
+    ExecutionError execution_error = 4;
+    FetchPartitionError fetch_partition_error = 5;
+    IOError io_error = 6;
+    ExecutorLost executor_lost = 7;
+    // A successful task's result is lost due to executor lost
+    ResultLost result_lost = 8;
+    TaskKilled task_killed = 9;
+  }
 }
 
-message CompletedTask {
+message SuccessfulTask {
   string executor_id = 1;
   // TODO tasks are currently always shuffle writes but this will not always be the case
   // so we might want to think about some refactoring of the task definitions
   repeated ShuffleWritePartition partitions = 2;
 }
 
+message ExecutionError {
+}
+
+message FetchPartitionError {
+  string executor_id = 1;
+  uint32 map_stage_id = 2;
+  uint32 map_partition_id = 3;
+}
+
+message IOError {
+}
+
+message ExecutorLost {
+}
+
+message ResultLost {
+}

Review Comment:
   Not sure I understand the difference between these two errors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org