You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 02:12:16 UTC

[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2202: Make ParquetExec usable outside of a tokio runtime (#2201)

yjshen commented on code in PR #2202:
URL: https://github.com/apache/arrow-datafusion/pull/2202#discussion_r847878050


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -203,75 +200,51 @@ impl ExecutionPlan for ParquetExec {
         partition_index: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        // because the parquet implementation is not thread-safe, it is necessary to execute
-        // on a thread and communicate with channels
-        let (response_tx, response_rx): (
-            Sender<ArrowResult<RecordBatch>>,
-            Receiver<ArrowResult<RecordBatch>>,
-        ) = channel(2);
-
-        let partition = self.base_config.file_groups[partition_index].clone();
-        let metrics = self.metrics.clone();
         let projection = match self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
-        let pruning_predicate = self.pruning_predicate.clone();
-        let batch_size = context.session_config().batch_size;
-        let limit = self.base_config.limit;
-        let object_store = Arc::clone(&self.base_config.object_store);
         let partition_col_proj = PartitionColumnProjector::new(
             Arc::clone(&self.projected_schema),
             &self.base_config.table_partition_cols,
         );
 
-        let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
-
-        let join_handle = task::spawn_blocking(move || {
-            let res = if projection.is_empty() {
-                read_partition_no_file_columns(
-                    object_store.as_ref(),
-                    &partition,
-                    batch_size,
-                    response_tx.clone(),
-                    limit,
-                    partition_col_proj,
-                )
-            } else {
-                read_partition(
-                    object_store.as_ref(),
-                    adapter,
-                    partition_index,
-                    &partition,
-                    metrics,
-                    &projection,
-                    &pruning_predicate,
-                    batch_size,
-                    response_tx.clone(),
-                    limit,
-                    partition_col_proj,
-                )
-            };
+        let stream = ParquetExecStream {
+            error: false,
+            partition_index,
+            metrics: self.metrics.clone(),
+            object_store: self.base_config.object_store.clone(),
+            pruning_predicate: self.pruning_predicate.clone(),
+            batch_size: context.session_config().batch_size,
+            schema: self.projected_schema.clone(),
+            projection,
+            remaining_rows: self.base_config.limit,
+            reader: None,
+            files: self.base_config.file_groups[partition_index].clone().into(),
+            projector: partition_col_proj,
+            adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+        };
 
-            if let Err(e) = res {
-                warn!(
-                    "Parquet reader thread terminated due to error: {:?} for files: {:?}",
-                    e, partition
-                );
-                // Send the error back to the main thread.
-                //
-                // Ignore error sending (via `.ok()`) because that
-                // means the receiver has been torn down (and nothing
-                // cares about the errors anymore)
-                send_result(&response_tx, Err(e.into())).ok();
+        // Use spawn_blocking only if running from a tokio context (#2201)
+        match tokio::runtime::Handle::try_current() {
+            Ok(handle) => {
+                let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
+                let schema = stream.schema();
+                let join_handle = handle.spawn_blocking(move || {
+                    for result in stream {
+                        if response_tx.blocking_send(result).is_err() {
+                            break;
+                        }
+                    }
+                });
+                Ok(RecordBatchReceiverStream::create(
+                    &schema,
+                    response_rx,
+                    join_handle,
+                ))
             }
-        });
-
-        Ok(RecordBatchReceiverStream::create(
-            &self.projected_schema,
-            response_rx,
-            join_handle,
-        ))
+            Err(_) => Ok(Box::pin(stream)),

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -312,15 +285,175 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-fn send_result(
-    response_tx: &Sender<ArrowResult<RecordBatch>>,
-    result: ArrowResult<RecordBatch>,
-) -> Result<()> {
-    // Note this function is running on its own blockng tokio thread so blocking here is ok.
-    response_tx
-        .blocking_send(result)
-        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
-    Ok(())
+/// Special-case empty column projection
+///
+/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
+enum ProjectedReader {
+    Reader {
+        reader: ParquetRecordBatchReader,
+    },
+    EmptyProjection {
+        remaining_rows: usize,
+        batch_size: usize,
+    },
+}
+
+/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
+///
+/// NB: This will perform blocking IO synchronously without yielding which may

Review Comment:
   👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org