You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/08 18:32:01 UTC
(arrow-datafusion) branch main updated: Add primary key support to stream table (#8467)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 34b0445b77 Add primary key support to stream table (#8467)
34b0445b77 is described below
commit 34b0445b778c503f4e65b384ee9ec119ec90044a
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Fri Dec 8 21:31:55 2023 +0300
Add primary key support to stream table (#8467)
---
datafusion/core/src/datasource/stream.rs | 14 +++++++++++-
datafusion/sqllogictest/test_files/groupby.slt | 31 ++++++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs
index 6965968b6f..e7512499eb 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -31,7 +31,7 @@ use async_trait::async_trait;
use futures::StreamExt;
use tokio::task::spawn_blocking;
-use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_plan::common::AbortOnDropSingle;
@@ -100,6 +100,7 @@ pub struct StreamConfig {
encoding: StreamEncoding,
header: bool,
order: Vec<Vec<Expr>>,
+ constraints: Constraints,
}
impl StreamConfig {
@@ -118,6 +119,7 @@ impl StreamConfig {
encoding: StreamEncoding::Csv,
order: vec![],
header: false,
+ constraints: Constraints::empty(),
}
}
@@ -145,6 +147,12 @@ impl StreamConfig {
self
}
+ /// Assign constraints
+ pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+ self.constraints = constraints;
+ self
+ }
+
fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
let file = File::open(&self.location)?;
let schema = self.schema.clone();
@@ -215,6 +223,10 @@ impl TableProvider for StreamTable {
self.0.schema.clone()
}
+ fn constraints(&self) -> Option<&Constraints> {
+ Some(&self.0.constraints)
+ }
+
fn table_type(&self) -> TableType {
TableType::Base
}
diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt
index 5248ac8c85..b7be4d78b5 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -4248,3 +4248,34 @@ set datafusion.sql_parser.dialect = 'Generic';
statement ok
drop table aggregate_test_100;
+
+
+# Create an unbounded external table with primary key
+# column c
+statement ok
+CREATE EXTERNAL TABLE unbounded_multiple_ordered_table_with_pk (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER primary key,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC)
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv';
+
+# Query below can be executed, since c is primary key.
+query III rowsort
+SELECT c, a, SUM(d)
+FROM unbounded_multiple_ordered_table_with_pk
+GROUP BY c
+ORDER BY c
+LIMIT 5
+----
+0 0 0
+1 0 2
+2 0 0
+3 0 0
+4 0 1