You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/27 12:39:04 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #39192: [SPARK-41423][CORE] Protobuf serializer for StageDataWrapper

LuciferYang commented on code in PR #39192:
URL: https://github.com/apache/spark/pull/39192#discussion_r1057610747


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,214 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message StageDataWrapper {
+  StageData info = 1;
+  repeated int64 job_ids = 2;
+  map<string, int64> locality = 3;
+}
+
+message TaskData {
+  int64 task_id = 1;
+  int32 index = 2;
+  int32 attempt = 3;
+  int32 partition_id = 4;
+  int64 launch_time = 5;
+  optional int64 result_fetch_start = 6;
+  optional int64 duration = 7;
+  string executor_id = 8;
+  string host = 9;
+  string status = 10;
+  string task_locality = 11;
+  bool speculative = 12;
+  repeated AccumulableInfo accumulator_updates = 13;
+  optional string error_message = 14;
+  optional TaskMetrics task_metrics = 15;
+  map<string, string> executor_logs = 16;
+  int64 scheduler_delay = 17;
+  int64 getting_result_time = 18;
+}
+
+message StageData {
+  enum StageStatus {

Review Comment:
   Why `StageStatus` designed as `StageData` inside `enum` ?
   
   
   



##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,214 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message StageDataWrapper {
+  StageData info = 1;
+  repeated int64 job_ids = 2;
+  map<string, int64> locality = 3;
+}
+
+message TaskData {
+  int64 task_id = 1;
+  int32 index = 2;
+  int32 attempt = 3;
+  int32 partition_id = 4;
+  int64 launch_time = 5;
+  optional int64 result_fetch_start = 6;
+  optional int64 duration = 7;
+  string executor_id = 8;
+  string host = 9;
+  string status = 10;
+  string task_locality = 11;
+  bool speculative = 12;
+  repeated AccumulableInfo accumulator_updates = 13;
+  optional string error_message = 14;
+  optional TaskMetrics task_metrics = 15;
+  map<string, string> executor_logs = 16;
+  int64 scheduler_delay = 17;
+  int64 getting_result_time = 18;
+}
+
+message StageData {
+  enum StageStatus {
+    UNSPECIFIED = 0;
+    ACTIVE = 1;
+    COMPLETE = 2;
+    FAILED = 3;
+    PENDING = 4;
+    SKIPPED = 5;
+  }
+
+  StageStatus status = 1;
+  int64 stage_id = 2;
+  int32 attempt_id = 3;
+  int32 num_tasks = 4;
+  int32 num_active_tasks = 5;
+  int32 num_complete_tasks = 6;
+  int32 num_failed_tasks = 7;
+  int32 num_killed_tasks = 8;
+  int32 num_completed_indices = 9;
+
+  optional int64 submission_time = 10;
+  optional int64 first_task_launched_time = 11;
+  optional int64 completion_time = 12;
+  optional string failure_reason = 13;
+
+  int64 executor_deserialize_time = 14;
+  int64 executor_deserialize_cpu_time = 15;
+  int64 executor_run_time = 16;
+  int64 executor_cpu_time = 17;
+  int64 result_size = 18;
+  int64 jvm_gc_time = 19;
+  int64 result_serialization_time = 20;
+  int64 memory_bytes_spilled = 21;
+  int64 disk_bytes_spilled = 22;
+  int64 peak_execution_memory = 23;
+  int64 input_bytes = 24;
+  int64 input_records = 25;
+  int64 output_bytes = 26;
+  int64 output_records = 27;
+  int64 shuffle_remote_blocks_fetched = 28;
+  int64 shuffle_local_blocks_fetched = 29;
+  int64 shuffle_fetch_wait_time = 30;
+  int64 shuffle_remote_bytes_read = 31;
+  int64 shuffle_remote_bytes_read_to_disk = 32;
+  int64 shuffle_local_bytes_read = 33;
+  int64 shuffle_read_bytes = 34;
+  int64 shuffle_read_records = 35;
+  int64 shuffle_write_bytes = 36;
+  int64 shuffle_write_time = 37;
+  int64 shuffle_write_records = 38;
+
+  string name = 39;
+  optional string description = 40;
+  string details = 41;
+  string scheduling_pool = 42;
+
+  repeated int64 rdd_ids = 43;
+  repeated AccumulableInfo accumulator_updates = 44;
+  map<int64, TaskData> tasks = 45;
+  map<string, ExecutorStageSummary> executor_summary = 46;

Review Comment:
   ditto



##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,214 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message StageDataWrapper {
+  StageData info = 1;
+  repeated int64 job_ids = 2;
+  map<string, int64> locality = 3;
+}
+
+message TaskData {
+  int64 task_id = 1;
+  int32 index = 2;
+  int32 attempt = 3;
+  int32 partition_id = 4;
+  int64 launch_time = 5;
+  optional int64 result_fetch_start = 6;
+  optional int64 duration = 7;
+  string executor_id = 8;
+  string host = 9;
+  string status = 10;
+  string task_locality = 11;
+  bool speculative = 12;
+  repeated AccumulableInfo accumulator_updates = 13;
+  optional string error_message = 14;
+  optional TaskMetrics task_metrics = 15;
+  map<string, string> executor_logs = 16;
+  int64 scheduler_delay = 17;
+  int64 getting_result_time = 18;
+}
+
+message StageData {
+  enum StageStatus {
+    UNSPECIFIED = 0;
+    ACTIVE = 1;
+    COMPLETE = 2;
+    FAILED = 3;
+    PENDING = 4;
+    SKIPPED = 5;
+  }
+
+  StageStatus status = 1;
+  int64 stage_id = 2;
+  int32 attempt_id = 3;
+  int32 num_tasks = 4;
+  int32 num_active_tasks = 5;
+  int32 num_complete_tasks = 6;
+  int32 num_failed_tasks = 7;
+  int32 num_killed_tasks = 8;
+  int32 num_completed_indices = 9;
+
+  optional int64 submission_time = 10;
+  optional int64 first_task_launched_time = 11;
+  optional int64 completion_time = 12;
+  optional string failure_reason = 13;
+
+  int64 executor_deserialize_time = 14;
+  int64 executor_deserialize_cpu_time = 15;
+  int64 executor_run_time = 16;
+  int64 executor_cpu_time = 17;
+  int64 result_size = 18;
+  int64 jvm_gc_time = 19;
+  int64 result_serialization_time = 20;
+  int64 memory_bytes_spilled = 21;
+  int64 disk_bytes_spilled = 22;
+  int64 peak_execution_memory = 23;
+  int64 input_bytes = 24;
+  int64 input_records = 25;
+  int64 output_bytes = 26;
+  int64 output_records = 27;
+  int64 shuffle_remote_blocks_fetched = 28;
+  int64 shuffle_local_blocks_fetched = 29;
+  int64 shuffle_fetch_wait_time = 30;
+  int64 shuffle_remote_bytes_read = 31;
+  int64 shuffle_remote_bytes_read_to_disk = 32;
+  int64 shuffle_local_bytes_read = 33;
+  int64 shuffle_read_bytes = 34;
+  int64 shuffle_read_records = 35;
+  int64 shuffle_write_bytes = 36;
+  int64 shuffle_write_time = 37;
+  int64 shuffle_write_records = 38;
+
+  string name = 39;
+  optional string description = 40;
+  string details = 41;
+  string scheduling_pool = 42;
+
+  repeated int64 rdd_ids = 43;
+  repeated AccumulableInfo accumulator_updates = 44;
+  map<int64, TaskData> tasks = 45;

Review Comment:
   I see, hmm... should we encapsulate this map?
   
   such as 
   
   ```
   optional TaskMap tasks = 45;
   
   message TaskMap {
     map<int64, TaskData> tasks = 1;
   }
   ```
   
   also cc @gengliangwang 



##########
core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala:
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.Date
+
+import collection.JavaConverters._
+import org.apache.commons.collections4.MapUtils
+
+import org.apache.spark.status.StageDataWrapper
+import org.apache.spark.status.api.v1.{ExecutorMetricsDistributions, ExecutorPeakMetricsDistributions, InputMetricDistributions, InputMetrics, OutputMetricDistributions, OutputMetrics, ShuffleReadMetricDistributions, ShuffleReadMetrics, ShuffleWriteMetricDistributions, ShuffleWriteMetrics, SpeculationStageSummary, StageData, StageStatus, TaskData, TaskMetricDistributions, TaskMetrics}
+import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
+
+class StageDataWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[StageDataWrapper]
+
+  override def serialize(input: Any): Array[Byte] =
+    serialize(input.asInstanceOf[StageDataWrapper])
+
+  private def serialize(s: StageDataWrapper): Array[Byte] = {
+    val builder = StoreTypes.StageDataWrapper.newBuilder()
+    builder.setInfo(serializeStageData(s.info))
+    s.jobIds.foreach(id => builder.addJobIds(id.toLong))
+    s.locality.foreach { entry =>
+      builder.putLocality(entry._1, entry._2)
+    }
+    builder.build().toByteArray
+  }
+
+  private def serializeStageData(stageData: StageData): StoreTypes.StageData = {
+    val stageDataBuilder = StoreTypes.StageData.newBuilder()
+    stageDataBuilder
+      .setStatus(serializeStageStatus(stageData.status))
+      .setStageId(stageData.stageId.toLong)
+      .setAttemptId(stageData.attemptId)
+      .setNumTasks(stageData.numTasks)
+      .setNumActiveTasks(stageData.numActiveTasks)
+      .setNumCompleteTasks(stageData.numCompleteTasks)
+      .setNumFailedTasks(stageData.numFailedTasks)
+      .setNumKilledTasks(stageData.numKilledTasks)
+      .setNumCompletedIndices(stageData.numCompletedIndices)
+      .setExecutorDeserializeTime(stageData.executorDeserializeTime)
+      .setExecutorDeserializeCpuTime(stageData.executorDeserializeCpuTime)
+      .setExecutorRunTime(stageData.executorRunTime)
+      .setExecutorCpuTime(stageData.executorCpuTime)
+      .setResultSize(stageData.resultSize)
+      .setJvmGcTime(stageData.jvmGcTime)
+      .setResultSerializationTime(stageData.resultSerializationTime)
+      .setMemoryBytesSpilled(stageData.memoryBytesSpilled)
+      .setDiskBytesSpilled(stageData.diskBytesSpilled)
+      .setPeakExecutionMemory(stageData.peakExecutionMemory)
+      .setInputBytes(stageData.inputBytes)
+      .setInputRecords(stageData.inputRecords)
+      .setOutputBytes(stageData.outputBytes)
+      .setOutputRecords(stageData.outputRecords)
+      .setShuffleRemoteBlocksFetched(stageData.shuffleRemoteBlocksFetched)
+      .setShuffleLocalBlocksFetched(stageData.shuffleLocalBlocksFetched)
+      .setShuffleFetchWaitTime(stageData.shuffleFetchWaitTime)
+      .setShuffleRemoteBytesRead(stageData.shuffleRemoteBytesRead)
+      .setShuffleRemoteBytesReadToDisk(stageData.shuffleRemoteBytesReadToDisk)
+      .setShuffleLocalBytesRead(stageData.shuffleLocalBytesRead)
+      .setShuffleReadBytes(stageData.shuffleReadBytes)
+      .setShuffleReadRecords(stageData.shuffleReadRecords)
+      .setShuffleWriteBytes(stageData.shuffleWriteBytes)
+      .setShuffleWriteTime(stageData.shuffleWriteTime)
+      .setShuffleWriteRecords(stageData.shuffleWriteRecords)
+      .setName(stageData.name)
+      .setDetails(stageData.details)
+      .setSchedulingPool(stageData.schedulingPool)
+      .setResourceProfileId(stageData.resourceProfileId)
+    stageData.submissionTime.foreach { d =>
+      stageDataBuilder.setSubmissionTime(d.getTime)
+    }
+    stageData.firstTaskLaunchedTime.foreach { d =>
+      stageDataBuilder.setFirstTaskLaunchedTime(d.getTime)
+    }
+    stageData.completionTime.foreach { d =>
+      stageDataBuilder.setCompletionTime(d.getTime)
+    }
+    stageData.failureReason.foreach { fr =>
+      stageDataBuilder.setFailureReason(fr)
+    }
+    stageData.description.foreach { d =>
+      stageDataBuilder.setDescription(d)
+    }
+    stageData.rddIds.foreach(id => stageDataBuilder.addRddIds(id.toLong))
+    stageData.accumulatorUpdates.foreach { update =>
+      stageDataBuilder.addAccumulatorUpdates(Utils.serializeAccumulableInfo(update))
+    }

Review Comment:
   I think there are 3 choices for the definition of `serializeAccumulableInfo`  function:
   
   1. Move it from class `TaskDataWrapperSerializer` to companion object `TaskDataWrapperSerializer`
   2. Move it from class `TaskDataWrapperSerializer` to object `AccumulableInfoSerializer`
   3. Keep the status quo and let `StageDataWrapperSerializer` hold a `TaskDataWrapperSerializer` instance 
   
   Similar suggestions for `deserializeAccumulableInfo\serializeExecutorStageSummary\deserializeExecutorStageSummary` and I think `Utils` should be a more general functions 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org