You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2020/11/26 12:30:00 UTC
[iceberg] branch master updated: Flink : Implement the
listPartitions method in FlinkCatalog (#1815)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7a306e5 Flink : Implement the listPartitions method in FlinkCatalog (#1815)
7a306e5 is described below
commit 7a306e5ff9cb37cd9be5d0693aa74210ee05fa5b
Author: JunZhang <zh...@126.com>
AuthorDate: Thu Nov 26 20:29:48 2020 +0800
Flink : Implement the listPartitions method in FlinkCatalog (#1815)
---
.../org/apache/iceberg/flink/FlinkCatalog.java | 39 ++++++-
.../apache/iceberg/flink/FlinkCatalogFactory.java | 4 +-
.../apache/iceberg/flink/FlinkCatalogTestBase.java | 2 +-
.../flink/TestFlinkCatalogTablePartitions.java | 113 +++++++++++++++++++++
4 files changed, 153 insertions(+), 5 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index c71c69f..2fd5696 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -46,15 +46,19 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
@@ -65,6 +69,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -88,6 +93,7 @@ public class FlinkCatalog extends AbstractCatalog {
private final String[] baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
+ private final boolean cacheEnabled;
// TODO - Update baseNamespace to use Namespace class
// https://github.com/apache/iceberg/issues/1541
@@ -100,6 +106,7 @@ public class FlinkCatalog extends AbstractCatalog {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.baseNamespace = baseNamespace;
+ this.cacheEnabled = cacheEnabled;
Catalog originalCatalog = catalogLoader.loadCatalog();
icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
@@ -303,7 +310,12 @@ public class FlinkCatalog extends AbstractCatalog {
Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
try {
- return icebergCatalog.loadTable(toIdentifier(tablePath));
+ Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+ if (cacheEnabled) {
+ table.refresh();
+ }
+
+ return table;
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new TableNotExistException(getName(), tablePath, e);
}
@@ -618,8 +630,29 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
- throws CatalogException {
- throw new UnsupportedOperationException();
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+
+ if (table.spec().isUnpartitioned()) {
+ throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
+ }
+
+ Set<CatalogPartitionSpec> set = Sets.newHashSet();
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
+ Map<String, String> map = Maps.newHashMap();
+ StructLike structLike = dataFile.partition();
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ for (int i = 0; i < structLike.size(); i++) {
+ map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
+ }
+ set.add(new CatalogPartitionSpec(map));
+ }
+ } catch (IOException e) {
+ throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
+ }
+
+ return Lists.newArrayList(set);
}
@Override
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 483fa9b..ac092af 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -66,6 +66,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String HIVE_CONF_DIR = "hive-conf-dir";
public static final String DEFAULT_DATABASE = "default-database";
public static final String BASE_NAMESPACE = "base-namespace";
+ public static final String CACHE_ENABLED = "cache-enabled";
/**
* Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
@@ -117,6 +118,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
properties.add(CatalogProperties.WAREHOUSE_LOCATION);
properties.add(CatalogProperties.HIVE_URI);
properties.add(CatalogProperties.HIVE_CLIENT_POOL_SIZE);
+ properties.add(CACHE_ENABLED);
return properties;
}
@@ -131,7 +133,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ?
Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new String[0]) :
new String[0];
- boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault("cache-enabled", "true"));
+ boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
index 9ec4dcd..fd6181b 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
@@ -84,7 +84,7 @@ public abstract class FlinkCatalogTestBase extends FlinkTestBase {
protected final String[] baseNamespace;
protected final Catalog validationCatalog;
protected final SupportsNamespaces validationNamespaceCatalog;
- private final Map<String, String> config = Maps.newHashMap();
+ protected final Map<String, String> config = Maps.newHashMap();
protected final String flinkDatabase;
protected final Namespace icebergNamespace;
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
new file mode 100644
index 0000000..3291934
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
+
+public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
+
+ private String tableName = "test_table";
+
+ private final FileFormat format;
+
+ @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, cacheEnabled={3}")
+ public static Iterable<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
+ for (Boolean cacheEnabled : new Boolean[] {true, false}) {
+ for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+ String catalogName = (String) catalogParams[0];
+ String[] baseNamespace = (String[]) catalogParams[1];
+ parameters.add(new Object[] {catalogName, baseNamespace, format, cacheEnabled});
+ }
+ }
+ }
+ return parameters;
+ }
+
+ public TestFlinkCatalogTablePartitions(String catalogName, String[] baseNamespace, FileFormat format,
+ boolean cacheEnabled) {
+ super(catalogName, baseNamespace);
+ this.format = format;
+ config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+ }
+
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ @After
+ public void cleanNamespaces() {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ @Test
+ public void testListPartitionsWithUnpartitionedTable() {
+ sql("CREATE TABLE %s (id INT, data VARCHAR) with ('write.format.default'='%s')",
+ tableName, format.name());
+ sql("INSERT INTO %s SELECT 1,'a'", tableName);
+
+ ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
+ FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
+ AssertHelpers.assertThrows("Should not list partitions for unpartitioned table.",
+ TableNotPartitionedException.class, () -> flinkCatalog.listPartitions(objectPath));
+ }
+
+ @Test
+ public void testListPartitionsWithPartitionedTable() throws TableNotExistException, TableNotPartitionedException {
+ sql("CREATE TABLE %s (id INT, data VARCHAR) PARTITIONED BY (data) " +
+ "with ('write.format.default'='%s')", tableName, format.name());
+ sql("INSERT INTO %s SELECT 1,'a'", tableName);
+ sql("INSERT INTO %s SELECT 2,'b'", tableName);
+
+ ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
+ FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
+ List<CatalogPartitionSpec> list = flinkCatalog.listPartitions(objectPath);
+ Assert.assertEquals("Should have 2 partition", 2, list.size());
+
+ List<CatalogPartitionSpec> expected = Lists.newArrayList();
+ CatalogPartitionSpec partitionSpec1 = new CatalogPartitionSpec(ImmutableMap.of("data", "a"));
+ CatalogPartitionSpec partitionSpec2 = new CatalogPartitionSpec(ImmutableMap.of("data", "b"));
+ expected.add(partitionSpec1);
+ expected.add(partitionSpec2);
+ Assert.assertEquals("Should produce the expected catalog partition specs.", list, expected);
+ }
+}