You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/11 17:35:19 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2202: Make ParquetExec usable outside of a tokio runtime (#2201)

tustvold opened a new pull request, #2202:
URL: https://github.com/apache/arrow-datafusion/pull/2202

   # Which issue does this PR close?
   
   Part of #2201.
   
    # Rationale for this change
   
   See ticket
   
   # What changes are included in this PR?
   
   Allows ParquetExec to be used outside of a tokio context
   
   # Are there any user-facing changes?
   ParquetExec will no longer panic if used outside of a tokio context
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2202: Make ParquetExec usable outside of a tokio runtime (#2201)

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2202:
URL: https://github.com/apache/arrow-datafusion/pull/2202#discussion_r847580201


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -454,141 +587,6 @@ fn build_row_group_predicate(
     )
 }
 
-fn read_partition_no_file_columns(

Review Comment:
   This logic is moved and reformulated into the Iterator implementation for [`ParquetExecStream`]. This simplifies the code and also allows it to be used without an intermediate buffer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold merged pull request #2202: Make ParquetExec usable outside of a tokio runtime (#2201)

Posted by GitBox <gi...@apache.org>.
tustvold merged PR #2202:
URL: https://github.com/apache/arrow-datafusion/pull/2202


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2202: Make ParquetExec usable outside of a tokio runtime (#2201)

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2202:
URL: https://github.com/apache/arrow-datafusion/pull/2202#discussion_r847878050


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -203,75 +200,51 @@ impl ExecutionPlan for ParquetExec {
         partition_index: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        // because the parquet implementation is not thread-safe, it is necessary to execute
-        // on a thread and communicate with channels
-        let (response_tx, response_rx): (
-            Sender<ArrowResult<RecordBatch>>,
-            Receiver<ArrowResult<RecordBatch>>,
-        ) = channel(2);
-
-        let partition = self.base_config.file_groups[partition_index].clone();
-        let metrics = self.metrics.clone();
         let projection = match self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
-        let pruning_predicate = self.pruning_predicate.clone();
-        let batch_size = context.session_config().batch_size;
-        let limit = self.base_config.limit;
-        let object_store = Arc::clone(&self.base_config.object_store);
         let partition_col_proj = PartitionColumnProjector::new(
             Arc::clone(&self.projected_schema),
             &self.base_config.table_partition_cols,
         );
 
-        let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
-
-        let join_handle = task::spawn_blocking(move || {
-            let res = if projection.is_empty() {
-                read_partition_no_file_columns(
-                    object_store.as_ref(),
-                    &partition,
-                    batch_size,
-                    response_tx.clone(),
-                    limit,
-                    partition_col_proj,
-                )
-            } else {
-                read_partition(
-                    object_store.as_ref(),
-                    adapter,
-                    partition_index,
-                    &partition,
-                    metrics,
-                    &projection,
-                    &pruning_predicate,
-                    batch_size,
-                    response_tx.clone(),
-                    limit,
-                    partition_col_proj,
-                )
-            };
+        let stream = ParquetExecStream {
+            error: false,
+            partition_index,
+            metrics: self.metrics.clone(),
+            object_store: self.base_config.object_store.clone(),
+            pruning_predicate: self.pruning_predicate.clone(),
+            batch_size: context.session_config().batch_size,
+            schema: self.projected_schema.clone(),
+            projection,
+            remaining_rows: self.base_config.limit,
+            reader: None,
+            files: self.base_config.file_groups[partition_index].clone().into(),
+            projector: partition_col_proj,
+            adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+        };
 
-            if let Err(e) = res {
-                warn!(
-                    "Parquet reader thread terminated due to error: {:?} for files: {:?}",
-                    e, partition
-                );
-                // Send the error back to the main thread.
-                //
-                // Ignore error sending (via `.ok()`) because that
-                // means the receiver has been torn down (and nothing
-                // cares about the errors anymore)
-                send_result(&response_tx, Err(e.into())).ok();
+        // Use spawn_blocking only if running from a tokio context (#2201)
+        match tokio::runtime::Handle::try_current() {
+            Ok(handle) => {
+                let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
+                let schema = stream.schema();
+                let join_handle = handle.spawn_blocking(move || {
+                    for result in stream {
+                        if response_tx.blocking_send(result).is_err() {
+                            break;
+                        }
+                    }
+                });
+                Ok(RecordBatchReceiverStream::create(
+                    &schema,
+                    response_rx,
+                    join_handle,
+                ))
             }
-        });
-
-        Ok(RecordBatchReceiverStream::create(
-            &self.projected_schema,
-            response_rx,
-            join_handle,
-        ))
+            Err(_) => Ok(Box::pin(stream)),

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -312,15 +285,175 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-fn send_result(
-    response_tx: &Sender<ArrowResult<RecordBatch>>,
-    result: ArrowResult<RecordBatch>,
-) -> Result<()> {
-    // Note this function is running on its own blockng tokio thread so blocking here is ok.
-    response_tx
-        .blocking_send(result)
-        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
-    Ok(())
+/// Special-case empty column projection
+///
+/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
+enum ProjectedReader {
+    Reader {
+        reader: ParquetRecordBatchReader,
+    },
+    EmptyProjection {
+        remaining_rows: usize,
+        batch_size: usize,
+    },
+}
+
+/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
+///
+/// NB: This will perform blocking IO synchronously without yielding which may

Review Comment:
   👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org