You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/08 18:32:01 UTC

(arrow-datafusion) branch main updated: Add primary key support to stream table (#8467)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 34b0445b77 Add primary key support to stream table (#8467)
34b0445b77 is described below

commit 34b0445b778c503f4e65b384ee9ec119ec90044a
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Fri Dec 8 21:31:55 2023 +0300

    Add primary key support to stream table (#8467)
---
 datafusion/core/src/datasource/stream.rs       | 14 +++++++++++-
 datafusion/sqllogictest/test_files/groupby.slt | 31 ++++++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs
index 6965968b6f..e7512499eb 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -31,7 +31,7 @@ use async_trait::async_trait;
 use futures::StreamExt;
 use tokio::task::spawn_blocking;
 
-use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::{CreateExternalTable, Expr, TableType};
 use datafusion_physical_plan::common::AbortOnDropSingle;
@@ -100,6 +100,7 @@ pub struct StreamConfig {
     encoding: StreamEncoding,
     header: bool,
     order: Vec<Vec<Expr>>,
+    constraints: Constraints,
 }
 
 impl StreamConfig {
@@ -118,6 +119,7 @@ impl StreamConfig {
             encoding: StreamEncoding::Csv,
             order: vec![],
             header: false,
+            constraints: Constraints::empty(),
         }
     }
 
@@ -145,6 +147,12 @@ impl StreamConfig {
         self
     }
 
+    /// Assign constraints
+    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+        self.constraints = constraints;
+        self
+    }
+
     fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
         let file = File::open(&self.location)?;
         let schema = self.schema.clone();
@@ -215,6 +223,10 @@ impl TableProvider for StreamTable {
         self.0.schema.clone()
     }
 
+    fn constraints(&self) -> Option<&Constraints> {
+        Some(&self.0.constraints)
+    }
+
     fn table_type(&self) -> TableType {
         TableType::Base
     }
diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt
index 5248ac8c85..b7be4d78b5 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -4248,3 +4248,34 @@ set datafusion.sql_parser.dialect = 'Generic';
 
 statement ok
 drop table aggregate_test_100;
+
+
+# Create an unbounded external table with primary key
+# column c
+statement ok
+CREATE EXTERNAL TABLE unbounded_multiple_ordered_table_with_pk (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER primary key,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC)
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv';
+
+# Query below can be executed, since c is primary key.
+query III rowsort
+SELECT c, a, SUM(d)
+FROM unbounded_multiple_ordered_table_with_pk
+GROUP BY c
+ORDER BY c
+LIMIT 5
+----
+0 0 0
+1 0 2
+2 0 0
+3 0 0
+4 0 1