You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/18 05:33:59 UTC

[GitHub] [spark] techaddict opened a new pull request, #39110: [SPARK-41429] Protobuf serializer for RDDOperationGraphWrapper

techaddict opened a new pull request, #39110:
URL: https://github.com/apache/spark/pull/39110

   ### What changes were proposed in this pull request?
   Add Protobuf serializer for RDDOperationGraphWrapper
   
   ### Why are the changes needed?
   Support fast and compact serialization/deserialization for RDDOperationGraphWrapper over RocksDB.
   
   ### Does this PR introduce any user-facing change?
   No
   
   ### How was this patch tested?
   New UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057081557


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      outgoingEdges = wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      incomingEdges = wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      rootCluster = deserializeRDDOperationClusterWrapper(wrapper.getRootCluster)
+    )
+  }
+
+  private def serializeRDDOperationClusterWrapper(op: RDDOperationClusterWrapper):
+    StoreTypes.RDDOperationClusterWrapper = {
+    val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
+    builder.setId(op.id)
+    builder.setName(op.name)
+    op.childNodes.foreach { node =>
+      builder.addChildNodes(serializeRDDOperationNode(node))
+    }
+    op.childClusters.foreach { cluster =>
+      builder.addChildClusters(serializeRDDOperationClusterWrapper(cluster))
+    }
+    builder.build()
+  }
+
+  private def deserializeRDDOperationClusterWrapper(op: StoreTypes.RDDOperationClusterWrapper):
+    RDDOperationClusterWrapper = {
+    new RDDOperationClusterWrapper(
+      id = op.getId,
+      name = op.getName,
+      childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq,
+      childClusters =
+        op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq
+    )
+  }
+
+  private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = {
+    val builder = StoreTypes.RDDOperationNode.newBuilder()
+    builder.setId(node.id)
+    builder.setName(node.name)
+    builder.setCached(node.cached)
+    builder.setBarrier(node.barrier)
+    builder.setCallsite(node.callsite)
+    builder.setOutputDeterministicLevel(serializeDeterministicLevel(node.outputDeterministicLevel))
+    builder.build()
+  }
+
+  private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): RDDOperationNode = {
+    RDDOperationNode(
+      id = node.getId,
+      name = node.getName,
+      cached = node.getCached,
+      barrier = node.getBarrier,
+      callsite = node.getCallsite,
+      outputDeterministicLevel = DeterministicLevel(node.getOutputDeterministicLevel.getNumber)
+    )
+  }
+
+  private def serializeRDDOperationEdge(edge: RDDOperationEdge): StoreTypes.RDDOperationEdge = {
+    val builder = StoreTypes.RDDOperationEdge.newBuilder()
+    builder.setFromId(edge.fromId)
+    builder.setToId(edge.toId)
+    builder.build()
+  }
+
+  private def deserializeRDDOperationEdge(edge: StoreTypes.RDDOperationEdge): RDDOperationEdge = {
+    RDDOperationEdge(
+      fromId = edge.getFromId,
+      toId = edge.getToId)
+  }
+
+  private def serializeDeterministicLevel(d: DeterministicLevel.Value):

Review Comment:
   Only used by one place, maybe inline is ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057866682


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      outgoingEdges = wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      incomingEdges = wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      rootCluster = deserializeRDDOperationClusterWrapper(wrapper.getRootCluster)
+    )
+  }
+
+  private def serializeRDDOperationClusterWrapper(op: RDDOperationClusterWrapper):
+    StoreTypes.RDDOperationClusterWrapper = {
+    val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
+    builder.setId(op.id)
+    builder.setName(op.name)
+    op.childNodes.foreach { node =>
+      builder.addChildNodes(serializeRDDOperationNode(node))
+    }
+    op.childClusters.foreach { cluster =>
+      builder.addChildClusters(serializeRDDOperationClusterWrapper(cluster))
+    }
+    builder.build()
+  }
+
+  private def deserializeRDDOperationClusterWrapper(op: StoreTypes.RDDOperationClusterWrapper):
+    RDDOperationClusterWrapper = {
+    new RDDOperationClusterWrapper(
+      id = op.getId,
+      name = op.getName,
+      childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq,
+      childClusters =
+        op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq
+    )
+  }
+
+  private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = {
+    val builder = StoreTypes.RDDOperationNode.newBuilder()
+    builder.setId(node.id)
+    builder.setName(node.name)
+    builder.setCached(node.cached)
+    builder.setBarrier(node.barrier)
+    builder.setCallsite(node.callsite)
+    builder.setOutputDeterministicLevel(serializeDeterministicLevel(node.outputDeterministicLevel))
+    builder.build()
+  }
+
+  private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): RDDOperationNode = {
+    RDDOperationNode(
+      id = node.getId,
+      name = node.getName,
+      cached = node.getCached,
+      barrier = node.getBarrier,
+      callsite = node.getCallsite,
+      outputDeterministicLevel = DeterministicLevel(node.getOutputDeterministicLevel.getNumber)
+    )
+  }
+
+  private def serializeRDDOperationEdge(edge: RDDOperationEdge): StoreTypes.RDDOperationEdge = {
+    val builder = StoreTypes.RDDOperationEdge.newBuilder()
+    builder.setFromId(edge.fromId)
+    builder.setToId(edge.toId)
+    builder.build()
+  }
+
+  private def deserializeRDDOperationEdge(edge: StoreTypes.RDDOperationEdge): RDDOperationEdge = {
+    RDDOperationEdge(
+      fromId = edge.getFromId,
+      toId = edge.getToId)
+  }
+
+  private def serializeDeterministicLevel(d: DeterministicLevel.Value):

Review Comment:
   DeterministicLevel is used in multiple places; we might need to re-use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on PR #39110:
URL: https://github.com/apache/spark/pull/39110#issuecomment-1366384358

   @gengliangwang addressed all the comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1058034241


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,

Review Comment:
   https://github.com/apache/spark/pull/39215/files is doing some refactor work, which does not block this one
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057867101


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,

Review Comment:
   @LuciferYang Agree, and since these are private[spark], it shouldn't be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057953155


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+enum DeterministicLevel {
+  DETERMINATE = 0;

Review Comment:
   makes sense, but getting this error,
   > store_types.proto:400:3: "UNSPECIFIED" is already defined in "org.apache.spark.status.protobuf".
   > 
   > 
   > store_types.proto:400:3: Note that enum values use C++ scoping rules, meaning that enum values are siblings of their type, not children of it.  Therefore, "UNSPECIFIED" must be unique within "org.apache.spark.status.protobuf", not just within "DeterministicLevel".
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057082842


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+enum DeterministicLevel {
+  DETERMINATE = 0;
+  UNORDERED = 1;
+  INDETERMINATE = 2;
+}
+
+message RDDOperationNode {
+  int32 id = 1;
+  string name = 2;
+  bool cached = 3;
+  bool barrier = 4;
+  string callsite = 5;
+  DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+  string id = 1;
+  string name = 2;
+  repeated RDDOperationNode child_nodes = 3;
+  repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+  int32 stage_id = 1;

Review Comment:
   Should follow other places `int32 stage_id` -> `int64 stage_id`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang closed pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper
URL: https://github.com/apache/spark/pull/39110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1058682080


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+message RDDOperationNode {
+  int32 id = 1;
+  string name = 2;
+  bool cached = 3;
+  bool barrier = 4;
+  string callsite = 5;
+  RDDOperationGraphWrapper.DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+  string id = 1;
+  string name = 2;
+  repeated RDDOperationNode child_nodes = 3;
+  repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+  int64 stage_id = 1;
+  repeated RDDOperationEdge edges = 2;
+  repeated RDDOperationEdge outgoing_edges = 3;
+  repeated RDDOperationEdge incoming_edges = 4;
+  RDDOperationClusterWrapper root_cluster = 5;
+  enum DeterministicLevel {

Review Comment:
   @LuciferYang @gengliangwang done 👍🏼 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1058000402


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+enum DeterministicLevel {
+  DETERMINATE = 0;

Review Comment:
   try moving the enum into RDDOperationGraphWrapper ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057942669


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,

Review Comment:
   > explicitly defining them as scala.collection.Seq to make no performance difference
   
   @LuciferYang could you explain details? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057079792


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,

Review Comment:
   I found that there are many redundant `toSeq` for Scala 2.12 in `status.protobuf` package,  this is for Scala 2.13 compatibility due to `Seq` represents `collection.Seq` in Scala 2.12 and `immutable.Seq` in Scala 2.13.
   
   This conversion will not affect Scala 2.12, but will make the performance of Scala 2.13 worse than Scala 2.12. Since these are internal definitions of Spark, I suggest explicitly defining them as `scala.collection.Seq` to make no performance difference  between  Scala 2.12 and Scala 2.13. Do you think it's ok? @gengliangwang 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1058072078


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+message RDDOperationNode {
+  int32 id = 1;
+  string name = 2;
+  bool cached = 3;
+  bool barrier = 4;
+  string callsite = 5;
+  RDDOperationGraphWrapper.DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+  string id = 1;
+  string name = 2;
+  repeated RDDOperationNode child_nodes = 3;
+  repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+  int64 stage_id = 1;
+  repeated RDDOperationEdge edges = 2;
+  repeated RDDOperationEdge outgoing_edges = 3;
+  repeated RDDOperationEdge incoming_edges = 4;
+  RDDOperationClusterWrapper root_cluster = 5;
+  enum DeterministicLevel {

Review Comment:
   if `DeterministicLevel` only used by `RDDOperationNode`, should we move it into `RDDOperationNode`?  Will `DeterministicLevel` be used elsewhere in the future?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on PR #39110:
URL: https://github.com/apache/spark/pull/39110#issuecomment-1367016502

   @gengliangwang updated the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057938753


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+enum DeterministicLevel {
+  DETERMINATE = 0;

Review Comment:
   Let's add `UNSPECIFIED` as field 0.
   See discussion in https://github.com/apache/spark/pull/38779#discussion_r1032062307



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1058649331


##########
core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto:
##########
@@ -390,3 +390,38 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+message RDDOperationNode {
+  int32 id = 1;
+  string name = 2;
+  bool cached = 3;
+  bool barrier = 4;
+  string callsite = 5;
+  RDDOperationGraphWrapper.DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+  string id = 1;
+  string name = 2;
+  repeated RDDOperationNode child_nodes = 3;
+  repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+  int64 stage_id = 1;
+  repeated RDDOperationEdge edges = 2;
+  repeated RDDOperationEdge outgoing_edges = 3;
+  repeated RDDOperationEdge incoming_edges = 4;
+  RDDOperationClusterWrapper root_cluster = 5;
+  enum DeterministicLevel {

Review Comment:
   +1, make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on PR #39110:
URL: https://github.com/apache/spark/pull/39110#issuecomment-1367650840

   Thanks, merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] techaddict commented on pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
techaddict commented on PR #39110:
URL: https://github.com/apache/spark/pull/39110#issuecomment-1364435841

   @gengliangwang @LuciferYang rebased this one, and it's ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on PR #39110:
URL: https://github.com/apache/spark/pull/39110#issuecomment-1366980316

   @techaddict can you resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1057939206


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])

Review Comment:
   let's avoid creating two methods with different parameters...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #39110: [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #39110:
URL: https://github.com/apache/spark/pull/39110#discussion_r1058034241


##########
core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[RDDOperationGraphWrapper])
+  }
+
+  private def serialize(input: RDDOperationGraphWrapper): Array[Byte] = {
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(input.stageId)
+    input.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    input.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    input.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(input.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,

Review Comment:
   https://github.com/apache/spark/pull/39215/files is doing some refactor work, which does not block the current pr
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org