You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2020/11/26 12:30:00 UTC

[iceberg] branch master updated: Flink : Implement the listPartitions method in FlinkCatalog (#1815)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a306e5  Flink : Implement the listPartitions method in FlinkCatalog (#1815)
7a306e5 is described below

commit 7a306e5ff9cb37cd9be5d0693aa74210ee05fa5b
Author: JunZhang <zh...@126.com>
AuthorDate: Thu Nov 26 20:29:48 2020 +0800

    Flink : Implement the listPartitions method in FlinkCatalog (#1815)
---
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  39 ++++++-
 .../apache/iceberg/flink/FlinkCatalogFactory.java  |   4 +-
 .../apache/iceberg/flink/FlinkCatalogTestBase.java |   2 +-
 .../flink/TestFlinkCatalogTablePartitions.java     | 113 +++++++++++++++++++++
 4 files changed, 153 insertions(+), 5 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index c71c69f..2fd5696 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -46,15 +46,19 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdateProperties;
@@ -65,6 +69,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -88,6 +93,7 @@ public class FlinkCatalog extends AbstractCatalog {
   private final String[] baseNamespace;
   private final SupportsNamespaces asNamespaceCatalog;
   private final Closeable closeable;
+  private final boolean cacheEnabled;
 
   // TODO - Update baseNamespace to use Namespace class
   // https://github.com/apache/iceberg/issues/1541
@@ -100,6 +106,7 @@ public class FlinkCatalog extends AbstractCatalog {
     super(catalogName, defaultDatabase);
     this.catalogLoader = catalogLoader;
     this.baseNamespace = baseNamespace;
+    this.cacheEnabled = cacheEnabled;
 
     Catalog originalCatalog = catalogLoader.loadCatalog();
     icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
@@ -303,7 +310,12 @@ public class FlinkCatalog extends AbstractCatalog {
 
   Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
     try {
-      return icebergCatalog.loadTable(toIdentifier(tablePath));
+      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+      if (cacheEnabled) {
+        table.refresh();
+      }
+
+      return table;
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new TableNotExistException(getName(), tablePath, e);
     }
@@ -618,8 +630,29 @@ public class FlinkCatalog extends AbstractCatalog {
 
   @Override
   public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+      throws TableNotExistException, TableNotPartitionedException, CatalogException {
+    Table table = loadIcebergTable(tablePath);
+
+    if (table.spec().isUnpartitioned()) {
+      throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
+    }
+
+    Set<CatalogPartitionSpec> set = Sets.newHashSet();
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
+        Map<String, String> map = Maps.newHashMap();
+        StructLike structLike = dataFile.partition();
+        PartitionSpec spec = table.specs().get(dataFile.specId());
+        for (int i = 0; i < structLike.size(); i++) {
+          map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
+        }
+        set.add(new CatalogPartitionSpec(map));
+      }
+    } catch (IOException e) {
+      throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
+    }
+
+    return Lists.newArrayList(set);
   }
 
   @Override
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 483fa9b..ac092af 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -66,6 +66,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String BASE_NAMESPACE = "base-namespace";
+  public static final String CACHE_ENABLED = "cache-enabled";
 
   /**
    * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
@@ -117,6 +118,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
     properties.add(CatalogProperties.WAREHOUSE_LOCATION);
     properties.add(CatalogProperties.HIVE_URI);
     properties.add(CatalogProperties.HIVE_CLIENT_POOL_SIZE);
+    properties.add(CACHE_ENABLED);
     return properties;
   }
 
@@ -131,7 +133,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
     String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ?
         Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new String[0]) :
         new String[0];
-    boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault("cache-enabled", "true"));
+    boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
     return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
   }
 
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index 9ec4dcd..fd6181b 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -84,7 +84,7 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
   protected final String[] baseNamespace;
   protected final Catalog validationCatalog;
   protected final SupportsNamespaces validationNamespaceCatalog;
-  private final Map<String, String> config = Maps.newHashMap();
+  protected final Map<String, String> config = Maps.newHashMap();
 
   protected final String flinkDatabase;
   protected final Namespace icebergNamespace;
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
new file mode 100644
index 0000000..3291934
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
+
+public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
+
+  private String tableName = "test_table";
+
+  private final FileFormat format;
+
+  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}")
+  public static Iterable<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
+      for (Boolean cacheEnabled : new Boolean[] {true, false}) {
+        for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+          String catalogName = (String) catalogParams[0];
+          String[] baseNamespace = (String[]) catalogParams[1];
+          parameters.add(new Object[] {catalogName, baseNamespace, format, cacheEnabled});
+        }
+      }
+    }
+    return parameters;
+  }
+
+  public TestFlinkCatalogTablePartitions(String catalogName, String[] baseNamespace, FileFormat format,
+                                         boolean cacheEnabled) {
+    super(catalogName, baseNamespace);
+    this.format = format;
+    config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+  }
+
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+  }
+
+  @After
+  public void cleanNamespaces() {
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  @Test
+  public void testListPartitionsWithUnpartitionedTable() {
+    sql("CREATE TABLE %s (id INT, data VARCHAR) with ('write.format.default'='%s')",
+        tableName, format.name());
+    sql("INSERT INTO %s SELECT 1,'a'", tableName);
+
+    ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
+    FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
+    AssertHelpers.assertThrows("Should not list partitions for unpartitioned table.",
+        TableNotPartitionedException.class, () -> flinkCatalog.listPartitions(objectPath));
+  }
+
+  @Test
+  public void testListPartitionsWithPartitionedTable() throws TableNotExistException, TableNotPartitionedException {
+    sql("CREATE TABLE %s (id INT, data VARCHAR) PARTITIONED BY (data) " +
+        "with ('write.format.default'='%s')", tableName, format.name());
+    sql("INSERT INTO %s SELECT 1,'a'", tableName);
+    sql("INSERT INTO %s SELECT 2,'b'", tableName);
+
+    ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
+    FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
+    List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
+    Assert.assertEquals("Should have 2 partition", 2, list.size());
+
+    List<CatalogPartitionSpec> expected = Lists.newArrayList();
+    CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(ImmutableMap.of("data", "a"));
+    CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(ImmutableMap.of("data", "b"));
+    expected.add(partitionSpec1);
+    expected.add(partitionSpec2);
+    Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected);
+  }
+}