You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/28 09:47:52 UTC

[flink-table-store] branch release-0.2 updated: [hotfix] Set defaultNamespace for Spark Catalog

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new c074d06c [hotfix] Set defaultNamespace for Spark Catalog
c074d06c is described below

commit c074d06c74e8228a2a608fdae2e401dca230671a
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jul 28 17:47:13 2022 +0800

    [hotfix] Set defaultNamespace for Spark Catalog
    
    This closes #251
---
 .../apache/flink/table/store/connector/FlinkCatalogFactory.java    | 5 ++++-
 .../java/org/apache/flink/table/store/file/catalog/Catalog.java    | 2 ++
 .../main/java/org/apache/flink/table/store/spark/SparkCatalog.java | 5 +++++
 .../java/org/apache/flink/table/store/spark/SparkReadITCase.java   | 7 +++++++
 4 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 1811b3e8..9c70ffe9 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 
 import java.util.Collections;
@@ -32,7 +33,9 @@ public class FlinkCatalogFactory implements org.apache.flink.table.factories.Cat
     public static final String IDENTIFIER = "table-store";
 
     public static final ConfigOption<String> DEFAULT_DATABASE =
-            ConfigOptions.key("default-database").stringType().defaultValue("default");
+            ConfigOptions.key("default-database")
+                    .stringType()
+                    .defaultValue(Catalog.DEFAULT_DATABASE);
 
     @Override
     public String factoryIdentifier() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 5d46aa07..2d3887c8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -35,6 +35,8 @@ import java.util.Optional;
  */
 public interface Catalog extends AutoCloseable {
 
+    String DEFAULT_DATABASE = "default";
+
     /**
      * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
      * object store.
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index e64150a5..d4db28db 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -77,6 +77,11 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         return name;
     }
 
+    @Override
+    public String[] defaultNamespace() {
+        return new String[] {Catalog.DEFAULT_DATABASE};
+    }
+
     @Override
     public void createNamespace(String[] namespace, Map<String, String> metadata)
             throws NamespaceAlreadyExistsException {
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 7c2ef33a..c2b36e48 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -315,6 +315,13 @@ public class SparkReadITCase {
                 .isTrue();
     }
 
+    @Test
+    public void testDefaultNamespace() {
+        spark.sql("USE tablestore");
+        assertThat(spark.sql("SHOW CURRENT NAMESPACE").collectAsList().toString())
+                .isEqualTo("[[tablestore,default]]");
+    }
+
     @Test
     public void testAlterPrimaryKeyNullability() {
         spark.sql("USE tablestore");