You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 15:00:52 UTC

[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #4390: Move physical plan serde from Ballista to DataFusion

andygrove commented on code in PR #4390:
URL: https://github.com/apache/arrow-datafusion/pull/4390#discussion_r1033652851


##########
datafusion/proto/proto/datafusion.proto:
##########
@@ -915,3 +915,470 @@ message StringifiedPlan {
   PlanType plan_type = 1;
   string plan = 2;
 }
+
+/////////////////////////////////////////////////////////////////////////////////////////////////
+
+// PhysicalPlanNode is a nested type
+message PhysicalPlanNode {
+  oneof PhysicalPlanType {
+    ParquetScanExecNode parquet_scan = 1;
+    CsvScanExecNode csv_scan = 2;
+    EmptyExecNode empty = 3;
+    ProjectionExecNode projection = 4;
+    GlobalLimitExecNode global_limit = 6;
+    LocalLimitExecNode local_limit = 7;
+    AggregateExecNode aggregate = 8;
+    HashJoinExecNode hash_join = 9;
+    SortExecNode sort = 10;
+    CoalesceBatchesExecNode coalesce_batches = 11;
+    FilterExecNode filter = 12;
+    CoalescePartitionsExecNode merge = 13;
+    RepartitionExecNode repartition = 14;
+    WindowAggExecNode window = 15;
+    CrossJoinExecNode cross_join = 16;
+    AvroScanExecNode avro_scan = 17;
+    PhysicalExtensionNode extension = 18;
+    UnionExecNode union = 19;
+    ExplainExecNode explain = 20;
+    SortPreservingMergeExecNode sort_preserving_merge = 21;
+  }
+}
+
+message PhysicalExtensionNode {
+  bytes node = 1;
+  repeated PhysicalPlanNode inputs = 2;
+}
+
+// physical expressions
+message PhysicalExprNode {
+  oneof ExprType {
+    // column references
+    PhysicalColumn column = 1;
+
+    datafusion.ScalarValue literal = 2;
+
+    // binary expressions
+    PhysicalBinaryExprNode binary_expr = 3;
+
+    // aggregate expressions
+    PhysicalAggregateExprNode aggregate_expr = 4;
+
+    // null checks
+    PhysicalIsNull is_null_expr = 5;
+    PhysicalIsNotNull is_not_null_expr = 6;
+    PhysicalNot not_expr = 7;
+
+    PhysicalCaseNode case_ = 8;
+    PhysicalCastNode cast = 9;
+    PhysicalSortExprNode sort = 10;
+    PhysicalNegativeNode negative = 11;
+    PhysicalInListNode in_list = 12;
+    PhysicalScalarFunctionNode scalar_function = 13;
+    PhysicalTryCastNode try_cast = 14;
+
+    // window expressions
+    PhysicalWindowExprNode window_expr = 15;
+
+    PhysicalScalarUdfNode scalar_udf = 16;
+
+    PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
+  }
+}
+
+message PhysicalScalarUdfNode {
+  string name = 1;
+  repeated PhysicalExprNode args = 2;
+  datafusion.ArrowType return_type = 4;
+}
+
+message PhysicalAggregateExprNode {
+  datafusion.AggregateFunction aggr_function = 1;
+  repeated PhysicalExprNode expr = 2;
+  bool distinct = 3;
+}
+
+message PhysicalWindowExprNode {
+  oneof window_function {
+    datafusion.AggregateFunction aggr_function = 1;
+    datafusion.BuiltInWindowFunction built_in_function = 2;
+    // udaf = 3
+  }
+  PhysicalExprNode expr = 4;
+}
+
+message PhysicalIsNull {
+  PhysicalExprNode expr = 1;
+}
+
+message PhysicalIsNotNull {
+  PhysicalExprNode expr = 1;
+}
+
+message PhysicalNot {
+  PhysicalExprNode expr = 1;
+}
+
+message PhysicalAliasNode {
+  PhysicalExprNode expr = 1;
+  string alias = 2;
+}
+
+message PhysicalBinaryExprNode {
+  PhysicalExprNode l = 1;
+  PhysicalExprNode r = 2;
+  string op = 3;
+}
+
+message PhysicalDateTimeIntervalExprNode {
+  PhysicalExprNode l = 1;
+  PhysicalExprNode r = 2;
+  string op = 3;
+}
+
+message PhysicalSortExprNode {
+  PhysicalExprNode expr = 1;
+  bool asc = 2;
+  bool nulls_first = 3;
+}
+
+message PhysicalWhenThen {
+  PhysicalExprNode when_expr = 1;
+  PhysicalExprNode then_expr = 2;
+}
+
+message PhysicalInListNode {
+  PhysicalExprNode expr = 1;
+  repeated PhysicalExprNode list = 2;
+  bool negated = 3;
+}
+
+message PhysicalCaseNode {
+  PhysicalExprNode expr = 1;
+  repeated PhysicalWhenThen when_then_expr = 2;
+  PhysicalExprNode else_expr = 3;
+}
+
+message PhysicalScalarFunctionNode {
+  string name = 1;
+  datafusion.ScalarFunction fun = 2;
+  repeated PhysicalExprNode args = 3;
+  datafusion.ArrowType return_type = 4;
+}
+
+message PhysicalTryCastNode {
+  PhysicalExprNode expr = 1;
+  datafusion.ArrowType arrow_type = 2;
+}
+
+message PhysicalCastNode {
+  PhysicalExprNode expr = 1;
+  datafusion.ArrowType arrow_type = 2;
+}
+
+message PhysicalNegativeNode {
+  PhysicalExprNode expr = 1;
+}
+
+message UnresolvedShuffleExecNode {
+  uint32 stage_id = 1;
+  datafusion.Schema schema = 2;
+  uint32 input_partition_count = 3;
+  uint32 output_partition_count = 4;
+}
+
+message FilterExecNode {
+  PhysicalPlanNode input = 1;
+  PhysicalExprNode expr = 2;
+}
+
+message FileGroup {
+  repeated PartitionedFile files = 1;
+}
+
+message ScanLimit {
+  // wrap into a message to make it optional
+  uint32 limit = 1;
+}
+
+message FileScanExecConf {
+  repeated FileGroup file_groups = 1;
+  datafusion.Schema schema = 2;
+  repeated uint32 projection = 4;
+  ScanLimit limit = 5;
+  Statistics statistics = 6;
+  repeated string table_partition_cols = 7;
+  string object_store_url = 8;
+}
+
+message ParquetScanExecNode {
+  FileScanExecConf base_conf = 1;
+  datafusion.LogicalExprNode pruning_predicate = 2;
+}
+
+message CsvScanExecNode {
+  FileScanExecConf base_conf = 1;
+  bool has_header = 2;
+  string delimiter = 3;
+}
+
+message AvroScanExecNode {
+  FileScanExecConf base_conf = 1;
+}
+
+enum PartitionMode {
+  COLLECT_LEFT = 0;
+  PARTITIONED = 1;
+  AUTO = 2;
+}
+
+message HashJoinExecNode {
+  PhysicalPlanNode left = 1;
+  PhysicalPlanNode right = 2;
+  repeated JoinOn on = 3;
+  datafusion.JoinType join_type = 4;
+  PartitionMode partition_mode = 6;
+  bool null_equals_null = 7;
+  JoinFilter filter = 8;
+}
+
+message UnionExecNode {
+  repeated PhysicalPlanNode inputs = 1;
+}
+
+message ExplainExecNode {
+  datafusion.Schema schema = 1;
+  repeated datafusion.StringifiedPlan stringified_plans = 2;
+  bool verbose = 3;
+}
+
+message CrossJoinExecNode {
+  PhysicalPlanNode left = 1;
+  PhysicalPlanNode right = 2;
+}
+
+message PhysicalColumn {
+  string name = 1;
+  uint32 index = 2;
+}
+
+message JoinOn {
+  PhysicalColumn left = 1;
+  PhysicalColumn right = 2;
+}
+
+message EmptyExecNode {
+  bool produce_one_row = 1;
+  datafusion.Schema schema = 2;
+}
+
+message ProjectionExecNode {
+  PhysicalPlanNode input = 1;
+  repeated PhysicalExprNode expr = 2;
+  repeated string expr_name = 3;
+}
+
+enum AggregateMode {
+  PARTIAL = 0;
+  FINAL = 1;
+  FINAL_PARTITIONED = 2;
+}
+
+message WindowAggExecNode {
+  PhysicalPlanNode input = 1;
+  repeated PhysicalExprNode window_expr = 2;
+  repeated string window_expr_name = 3;
+  datafusion.Schema input_schema = 4;
+}
+
+message AggregateExecNode {
+  repeated PhysicalExprNode group_expr = 1;
+  repeated PhysicalExprNode aggr_expr = 2;
+  AggregateMode mode = 3;
+  PhysicalPlanNode input = 4;
+  repeated string group_expr_name = 5;
+  repeated string aggr_expr_name = 6;
+  // we need the input schema to the partial aggregate to pass to the final aggregate
+  datafusion.Schema input_schema = 7;
+  repeated PhysicalExprNode null_expr = 8;
+  repeated bool groups = 9;
+}
+
+message ShuffleWriterExecNode {
+  //TODO it seems redundant to provide job and stage id here since we also have them
+  // in the TaskDefinition that wraps this plan
+  string job_id = 1;
+  uint32 stage_id = 2;
+  PhysicalPlanNode input = 3;
+  PhysicalHashRepartition output_partitioning = 4;
+}

Review Comment:
   This operator is specific to Ballista



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org