You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/27 08:59:21 UTC

[flink-table-store] branch master updated: [FLINK-28703] Describe partitioned spark table lost partition info

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a251033e [FLINK-28703] Describe partitioned spark table lost partition info
a251033e is described below

commit a251033ece720a5fc571f36f5335c83715dc07de
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Wed Jul 27 16:59:17 2022 +0800

    [FLINK-28703] Describe partitioned spark table lost partition info
    
    This closes #246
---
 .../org/apache/flink/table/store/spark/SparkTable.java     | 11 +++++++++++
 .../apache/flink/table/store/spark/SparkReadITCase.java    | 14 ++++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index c5ec1a18..8cf39668 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.spark.sql.catalog.Table;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.IdentityTransform;
+import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -61,4 +64,12 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
         capabilities.add(TableCapability.BATCH_READ);
         return capabilities;
     }
+
+    @Override
+    public Transform[] partitioning() {
+        return table.schema().partitionKeys().stream()
+                .map(FieldReference::apply)
+                .map(IdentityTransform::apply)
+                .toArray(Transform[]::new);
+    }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 93056d9f..7c2ef33a 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -386,6 +386,20 @@ public class SparkReadITCase {
         assertThat(schema.logicalRowType().getTypeAt(0).isNullable()).isFalse();
     }
 
+    @Test
+    public void testDescribeTable() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.PartitionedTable (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar')");
+        assertThat(spark.sql("DESCRIBE default.PartitionedTable").collectAsList().toString())
+                .isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]");
+    }
+
     @Test
     public void testCreateTableWithInvalidPk() {
         spark.sql("USE tablestore");