You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/29 16:08:18 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4390: Move physical plan serde from Ballista to DataFusion

alamb commented on code in PR #4390:
URL: https://github.com/apache/arrow-datafusion/pull/4390#discussion_r1034953748


##########
datafusion/proto/src/physical_plan/from_proto.rs:
##########
@@ -0,0 +1,428 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Serde code to convert from protocol buffers to Rust data structures.
+
+use crate::protobuf;
+use chrono::TimeZone;
+use chrono::Utc;
+use datafusion::arrow::datatypes::Schema;
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::listing::{FileRange, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::execution::context::ExecutionProps;
+use datafusion::execution::FunctionRegistry;
+use datafusion::logical_expr::window_function::WindowFunction;
+use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
+use datafusion::physical_expr::ScalarFunctionExpr;
+use datafusion::physical_plan::file_format::FileScanConfig;
+use datafusion::physical_plan::{
+    expressions::{
+        BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
+        Literal, NegativeExpr, NotExpr, TryCastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
+    },
+    functions, Partitioning,
+};
+use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
+use datafusion_common::DataFusionError;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::convert::{TryFrom, TryInto};
+use std::ops::Deref;
+use std::sync::Arc;
+
+use crate::common::proto_error;
+use crate::convert_required;
+use crate::from_proto::from_proto_binary_op;
+use crate::protobuf::physical_expr_node::ExprType;
+use crate::protobuf::JoinSide;
+use parking_lot::RwLock;
+
+impl From<&protobuf::PhysicalColumn> for Column {
+    fn from(c: &protobuf::PhysicalColumn) -> Column {
+        Column::new(&c.name, c.index as usize)
+    }
+}
+
+pub(crate) fn parse_physical_expr(
+    proto: &protobuf::PhysicalExprNode,
+    registry: &dyn FunctionRegistry,
+    input_schema: &Schema,
+) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
+    let expr_type = proto
+        .expr_type
+        .as_ref()
+        .ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
+
+    let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
+        ExprType::Column(c) => {
+            let pcol: Column = c.into();
+            Arc::new(pcol)
+        }
+        ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
+        ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
+            parse_required_physical_box_expr(
+                &binary_expr.l,
+                registry,
+                "left",
+                input_schema,
+            )?,
+            from_proto_binary_op(&binary_expr.op)?,
+            parse_required_physical_box_expr(
+                &binary_expr.r,
+                registry,
+                "right",
+                input_schema,
+            )?,
+        )),
+        ExprType::DateTimeIntervalExpr(expr) => Arc::new(DateTimeIntervalExpr::try_new(
+            parse_required_physical_box_expr(&expr.l, registry, "left", input_schema)?,
+            from_proto_binary_op(&expr.op)?,
+            parse_required_physical_box_expr(&expr.r, registry, "right", input_schema)?,
+            input_schema,
+        )?),
+        ExprType::AggregateExpr(_) => {
+            return Err(DataFusionError::Internal(

Review Comment:
   Maybe these should be `DataFusionError::NotYetImplemented` type errors -- I don't see why they would be internal errors



##########
datafusion/proto/src/physical_plan/mod.rs:
##########
@@ -0,0 +1,1592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::convert::TryInto;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use datafusion::arrow::compute::SortOptions;
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::file_type::FileCompressionType;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::execution::FunctionRegistry;
+use datafusion::logical_expr::WindowFrame;
+use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateMode};
+use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
+use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::empty::EmptyExec;
+use datafusion::physical_plan::explain::ExplainExec;
+use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
+use datafusion::physical_plan::file_format::{
+    AvroExec, CsvExec, FileScanConfig, ParquetExec,
+};
+use datafusion::physical_plan::filter::FilterExec;
+use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
+use datafusion::physical_plan::joins::CrossJoinExec;
+use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
+use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::sorts::sort::SortExec;
+use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion::physical_plan::union::UnionExec;
+use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
+use datafusion::physical_plan::{
+    AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
+};
+use datafusion_common::DataFusionError;
+use parking_lot::RwLock;
+use prost::bytes::BufMut;
+use prost::Message;
+
+use crate::common::proto_error;
+use crate::common::{byte_to_string, str_to_byte};
+use crate::from_proto::parse_expr;
+use crate::physical_plan::from_proto::parse_physical_expr;
+use crate::protobuf::physical_expr_node::ExprType;
+use crate::protobuf::physical_plan_node::PhysicalPlanType;
+use crate::protobuf::repartition_exec_node::PartitionMethod;
+use crate::protobuf::{self, PhysicalPlanNode};
+use crate::{convert_required, into_physical_plan, into_required};
+
+pub mod from_proto;
+pub mod to_proto;
+
+impl AsExecutionPlan for PhysicalPlanNode {
+    fn try_decode(buf: &[u8]) -> Result<Self, DataFusionError>
+    where
+        Self: Sized,
+    {
+        protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
+            DataFusionError::Internal(format!("failed to decode physical plan: {:?}", e))
+        })
+    }
+
+    fn try_encode<B>(&self, buf: &mut B) -> Result<(), DataFusionError>
+    where
+        B: BufMut,
+        Self: Sized,
+    {
+        self.encode(buf).map_err(|e| {
+            DataFusionError::Internal(format!("failed to encode physical plan: {:?}", e))
+        })
+    }
+
+    #[allow(clippy::only_used_in_recursion)]
+    fn try_into_physical_plan(
+        &self,
+        registry: &dyn FunctionRegistry,
+        runtime: &RuntimeEnv,
+        extension_codec: &dyn PhysicalExtensionCodec,
+    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
+        let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
+            proto_error(format!(
+                "physical_plan::from_proto() Unsupported physical plan '{:?}'",
+                self
+            ))
+        })?;
+        match plan {
+            PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
+                Arc::new(explain.schema.as_ref().unwrap().try_into()?),
+                explain
+                    .stringified_plans
+                    .iter()
+                    .map(|plan| plan.into())
+                    .collect(),
+                explain.verbose,
+            ))),
+            PhysicalPlanType::Projection(projection) => {
+                let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
+                    projection.input,
+                    registry,
+                    runtime,
+                    extension_codec
+                )?;
+                let exprs = projection
+                    .expr
+                    .iter()
+                    .zip(projection.expr_name.iter())
+                    .map(|(expr, name)| Ok((parse_physical_expr(expr,registry, input.schema().as_ref())?, name.to_string())))
+                    .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>, DataFusionError>>(
+                    )?;
+                Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
+            }
+            PhysicalPlanType::Filter(filter) => {
+                let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
+                    filter.input,
+                    registry,
+                    runtime,
+                    extension_codec
+                )?;
+                let predicate = filter
+                    .expr
+                    .as_ref()
+                    .map(|expr| {
+                        parse_physical_expr(expr, registry, input.schema().as_ref())
+                    })
+                    .transpose()?
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(
+                            "filter (FilterExecNode) in PhysicalPlanNode is missing."
+                                .to_owned(),
+                        )
+                    })?;
+                Ok(Arc::new(FilterExec::try_new(predicate, input)?))
+            }
+            PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
+                decode_scan_config(scan.base_conf.as_ref().unwrap())?,
+                scan.has_header,
+                str_to_byte(&scan.delimiter)?,
+                FileCompressionType::UNCOMPRESSED,
+            ))),
+            PhysicalPlanType::ParquetScan(scan) => {
+                let predicate = scan
+                    .pruning_predicate
+                    .as_ref()
+                    .map(|expr| parse_expr(expr, registry))
+                    .transpose()?;
+                Ok(Arc::new(ParquetExec::new(
+                    decode_scan_config(scan.base_conf.as_ref().unwrap())?,
+                    predicate,
+                    None,
+                )))
+            }
+            PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
+                decode_scan_config(scan.base_conf.as_ref().unwrap())?,
+            ))),
+            PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
+                let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
+                    coalesce_batches.input,
+                    registry,
+                    runtime,
+                    extension_codec
+                )?;
+                Ok(Arc::new(CoalesceBatchesExec::new(
+                    input,
+                    coalesce_batches.target_batch_size as usize,
+                )))
+            }
+            PhysicalPlanType::Merge(merge) => {
+                let input: Arc<dyn ExecutionPlan> =
+                    into_physical_plan!(merge.input, registry, runtime, extension_codec)?;
+                Ok(Arc::new(CoalescePartitionsExec::new(input)))
+            }
+            PhysicalPlanType::Repartition(repart) => {
+                let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
+                    repart.input,
+                    registry,
+                    runtime,
+                    extension_codec
+                )?;
+                match repart.partition_method {
+                    Some(PartitionMethod::Hash(ref hash_part)) => {
+                        let expr = hash_part
+                            .hash_expr
+                            .iter()
+                            .map(|e| {
+                                parse_physical_expr(e, registry, input.schema().as_ref())
+                            })
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
+
+                        Ok(Arc::new(RepartitionExec::try_new(
+                            input,
+                            Partitioning::Hash(
+                                expr,
+                                hash_part.partition_count.try_into().unwrap(),
+                            ),
+                        )?))
+                    }
+                    Some(PartitionMethod::RoundRobin(partition_count)) => {
+                        Ok(Arc::new(RepartitionExec::try_new(
+                            input,
+                            Partitioning::RoundRobinBatch(
+                                partition_count.try_into().unwrap(),
+                            ),
+                        )?))
+                    }
+                    Some(PartitionMethod::Unknown(partition_count)) => {
+                        Ok(Arc::new(RepartitionExec::try_new(
+                            input,
+                            Partitioning::UnknownPartitioning(
+                                partition_count.try_into().unwrap(),
+                            ),
+                        )?))
+                    }
+                    _ => Err(DataFusionError::Internal(
+                        "Invalid partitioning scheme".to_owned(),
+                    )),
+                }
+            }
+            PhysicalPlanType::GlobalLimit(limit) => {
+                let input: Arc<dyn ExecutionPlan> =
+                    into_physical_plan!(limit.input, registry, runtime, extension_codec)?;
+                let fetch = if limit.fetch >= 0 {
+                    Some(limit.fetch as usize)
+                } else {
+                    None
+                };
+                Ok(Arc::new(GlobalLimitExec::new(
+                    input,
+                    limit.skip as usize,
+                    fetch,
+                )))
+            }
+            PhysicalPlanType::LocalLimit(limit) => {
+                let input: Arc<dyn ExecutionPlan> =
+                    into_physical_plan!(limit.input, registry, runtime, extension_codec)?;
+                Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
+            }
+            PhysicalPlanType::Window(window_agg) => {
+                let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
+                    window_agg.input,
+                    registry,
+                    runtime,
+                    extension_codec
+                )?;
+                let input_schema = window_agg
+                    .input_schema
+                    .as_ref()
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(
+                            "input_schema in WindowAggrNode is missing.".to_owned(),
+                        )
+                    })?
+                    .clone();
+                let physical_schema: SchemaRef =
+                    SchemaRef::new((&input_schema).try_into()?);
+
+                let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
+                    .window_expr
+                    .iter()
+                    .zip(window_agg.window_expr_name.iter())
+                    .map(|(expr, name)| {
+                        let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
+                            proto_error("Unexpected empty window physical expression")
+                        })?;
+
+                        match expr_type {
+                            ExprType::WindowExpr(window_node) => {
+                                let window_node_expr = window_node
+                                    .expr
+                                    .as_ref()
+                                    .map(|e| {
+                                        parse_physical_expr(
+                                            e.as_ref(),
+                                            registry,
+                                            &physical_schema,
+                                        )
+                                    })
+                                    .transpose()?
+                                    .ok_or_else(|| {
+                                        proto_error(
+                                            "missing window_node expr expression"
+                                                .to_string(),
+                                        )
+                                    })?;
+
+                                Ok(create_window_expr(
+                                    &convert_required!(window_node.window_function)?,
+                                    name.to_owned(),
+                                    &[window_node_expr],
+                                    &[],
+                                    &[],
+                                    Some(Arc::new(WindowFrame::default())),
+                                    &physical_schema,
+                                )?)
+                            }
+                            _ => Err(DataFusionError::Internal(
+                                "Invalid expression for WindowAggrExec".to_string(),
+                            )),
+                        }
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                //todo fill partition keys and sort keys

Review Comment:
   is this still a todo? Perhaps we can file a follow on ticket



##########
datafusion/proto/src/common.rs:
##########
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+
+pub fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
+    let b = &[b];
+    let b = std::str::from_utf8(b)
+        .map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?;

Review Comment:
   Since the only use of this function is for the CSV delimiter and the error message is specific to that usecase, perhaps we could name this function something more like `csv_delimiter_to_string` and `csv_delimiter_to_bytes`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org