You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/27 08:59:21 UTC
[flink-table-store] branch master updated: [FLINK-28703] Describe partitioned spark table lost partition info
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a251033e [FLINK-28703] Describe partitioned spark table lost partition info
a251033e is described below
commit a251033ece720a5fc571f36f5335c83715dc07de
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Wed Jul 27 16:59:17 2022 +0800
[FLINK-28703] Describe partitioned spark table lost partition info
This closes #246
---
.../org/apache/flink/table/store/spark/SparkTable.java | 11 +++++++++++
.../apache/flink/table/store/spark/SparkReadITCase.java | 14 ++++++++++++++
2 files changed, 25 insertions(+)
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index c5ec1a18..8cf39668 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.spark.sql.catalog.Table;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.IdentityTransform;
+import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -61,4 +64,12 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
capabilities.add(TableCapability.BATCH_READ);
return capabilities;
}
+
+ @Override
+ public Transform[] partitioning() {
+ return table.schema().partitionKeys().stream()
+ .map(FieldReference::apply)
+ .map(IdentityTransform::apply)
+ .toArray(Transform[]::new);
+ }
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 93056d9f..7c2ef33a 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -386,6 +386,20 @@ public class SparkReadITCase {
assertThat(schema.logicalRowType().getTypeAt(0).isNullable()).isFalse();
}
+ @Test
+ public void testDescribeTable() {
+ spark.sql("USE tablestore");
+ spark.sql(
+ "CREATE TABLE default.PartitionedTable (\n"
+ + "a BIGINT,\n"
+ + "b STRING) USING tablestore\n"
+ + "COMMENT 'table comment'\n"
+ + "PARTITIONED BY (a)\n"
+ + "TBLPROPERTIES ('foo' = 'bar')");
+ assertThat(spark.sql("DESCRIBE default.PartitionedTable").collectAsList().toString())
+ .isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]");
+ }
+
@Test
public void testCreateTableWithInvalidPk() {
spark.sql("USE tablestore");