You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/04/27 14:54:52 UTC

[iceberg] branch master updated: Core: Extract listPartition methods from SparkTableUtil into DataUtil class in core module. (#2494)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b3fc299  Core: Extract listPartition methods from SparkTableUtil into DataUtil class in core module. (#2494)
b3fc299 is described below

commit b3fc2999601387481a3cea4737129527b1304c3b
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Tue Apr 27 16:54:27 2021 +0200

    Core: Extract listPartition methods from SparkTableUtil into DataUtil class in core module. (#2494)
    
    Extract listPartition methods from SparkTableUtil into TableMigrationUtil class in core module for use with
    other frameworks.
---
 .../apache/iceberg/data/TableMigrationUtil.java    | 180 +++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 170 ++-----------------
 2 files changed, 190 insertions(+), 160 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
new file mode 100644
index 0000000..2750847
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+public class TableMigrationUtil {
+
+  private static final PathFilter HIDDEN_PATH_FILTER =
+      p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
+
+  private TableMigrationUtil() {
+  }
+
+  /**
+   * Returns the data files in a partition by listing the partition location.
+   * <p>
+   * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
+   * metrics are set to null.
+   * <p>
+   * Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but not file footers,
+   * will not be populated.
+   *
+   * @param partition partition key, e.g., "a=1/b=2"
+   * @param uri           partition location URI
+   * @param format        partition format, avro, parquet or orc
+   * @param spec          a partition spec
+   * @param conf          a Hadoop conf
+   * @param metricsConfig a metrics conf
+   * @param mapping       a name mapping
+   * @return a List of DataFile
+   */
+  public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
+                                             PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
+                                             NameMapping mapping) {
+    if (format.contains("avro")) {
+      return listAvroPartition(partition, uri, spec, conf);
+    } else if (format.contains("parquet")) {
+      return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
+    } else if (format.contains("orc")) {
+      return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
+    } else {
+      throw new UnsupportedOperationException("Unknown partition format: " + format);
+    }
+  }
+
+  private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
+                                                  PartitionSpec spec, Configuration conf) {
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          .filter(FileStatus::isFile)
+          .map(stat -> {
+            Metrics metrics = new Metrics(-1L, null, null, null);
+            String partitionKey = spec.fields().stream()
+                .map(PartitionField::name)
+                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+                .collect(Collectors.joining("/"));
+
+            return DataFiles.builder(spec)
+                .withPath(stat.getPath().toString())
+                .withFormat("avro")
+                .withFileSizeInBytes(stat.getLen())
+                .withMetrics(metrics)
+                .withPartitionPath(partitionKey)
+                .build();
+
+          }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+    }
+  }
+
+  private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
+                                                     PartitionSpec spec, Configuration conf,
+                                                     MetricsConfig metricsSpec, NameMapping mapping) {
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+
+      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          .filter(FileStatus::isFile)
+          .map(stat -> {
+            Metrics metrics;
+            try {
+              ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
+              metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+            } catch (IOException e) {
+              throw new RuntimeException("Unable to read the footer of the parquet file: " +
+                  stat.getPath(), e);
+            }
+            String partitionKey = spec.fields().stream()
+                .map(PartitionField::name)
+                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+                .collect(Collectors.joining("/"));
+
+            return DataFiles.builder(spec)
+                .withPath(stat.getPath().toString())
+                .withFormat("parquet")
+                .withFileSizeInBytes(stat.getLen())
+                .withMetrics(metrics)
+                .withPartitionPath(partitionKey)
+                .build();
+          }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+    }
+  }
+
+  private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
+                                                 PartitionSpec spec, Configuration conf,
+                                                 MetricsConfig metricsSpec, NameMapping mapping) {
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+
+      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          .filter(FileStatus::isFile)
+          .map(stat -> {
+            Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
+                metricsSpec, mapping);
+            String partitionKey = spec.fields().stream()
+                .map(PartitionField::name)
+                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+                .collect(Collectors.joining("/"));
+
+            return DataFiles.builder(spec)
+                .withPath(stat.getPath().toString())
+                .withFormat("orc")
+                .withFileSizeInBytes(stat.getLen())
+                .withMetrics(metrics)
+                .withPartitionPath(partitionKey)
+                .build();
+
+          }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index d03f96c..c9bfd49 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -22,41 +22,32 @@ package org.apache.iceberg.spark;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
-import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.TableMigrationUtil;
 import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.OrcMetrics;
-import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Objects;
@@ -65,8 +56,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -255,12 +244,15 @@ public class SparkTableUtil {
    *
    * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
    * metrics are set to null.
+   * @deprecated use {@link TableMigrationUtil#listPartition(Map, String, String, PartitionSpec, Configuration,
+   * MetricsConfig, NameMapping)}
    *
    * @param partition a partition
    * @param conf a serializable Hadoop conf
    * @param metricsConfig a metrics conf
    * @return a List of DataFile
    */
+  @Deprecated
   public static List<DataFile> listPartition(SparkPartition partition, PartitionSpec spec,
                                              SerializableConfiguration conf, MetricsConfig metricsConfig) {
     return listPartition(partition, spec, conf, metricsConfig, null);
@@ -271,6 +263,8 @@ public class SparkTableUtil {
    *
    * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
    * metrics are set to null.
+   * @deprecated use {@link TableMigrationUtil#listPartition(Map, String, String, PartitionSpec, Configuration,
+   * MetricsConfig, NameMapping)}
    *
    * @param partition a partition
    * @param conf a serializable Hadoop conf
@@ -278,158 +272,14 @@ public class SparkTableUtil {
    * @param mapping a name mapping
    * @return a List of DataFile
    */
+  @Deprecated
   public static List<DataFile> listPartition(SparkPartition partition, PartitionSpec spec,
                                              SerializableConfiguration conf, MetricsConfig metricsConfig,
                                              NameMapping mapping) {
-    return listPartition(partition.values, partition.uri, partition.format, spec, conf.get(), metricsConfig, mapping);
+    return TableMigrationUtil.listPartition(partition.values, partition.uri, partition.format, spec, conf.get(),
+        metricsConfig, mapping);
   }
 
-  /**
-   * Returns the data files in a partition by listing the partition location.
-   *
-   * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
-   * metrics are set to null.
-   *
-   * @param partition partition key, e.g., "a=1/b=2"
-   * @param uri partition location URI
-   * @param format partition format, avro or parquet
-   * @param spec a partition spec
-   * @param conf a Hadoop conf
-   * @param metricsConfig a metrics conf
-   * @return a List of DataFile
-   */
-  public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
-                                             PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig) {
-    return listPartition(partition, uri, format, spec, conf, metricsConfig, null);
-  }
-
-  /**
-   * Returns the data files in a partition by listing the partition location.
-   * <p>
-   * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
-   * metrics are set to null.
-   * <p>
-   * Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but not file footers, will
-   * not be populated.
-   *
-   * @param partition partition key, e.g., "a=1/b=2"
-   * @param uri partition location URI
-   * @param format partition format, avro or parquet
-   * @param spec a partition spec
-   * @param conf a Hadoop conf
-   * @param metricsConfig a metrics conf
-   * @param mapping a name mapping
-   * @return a List of DataFile
-   */
-  public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
-                                             PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
-                                             NameMapping mapping) {
-    if (format.contains("avro")) {
-      return listAvroPartition(partition, uri, spec, conf);
-    } else if (format.contains("parquet")) {
-      return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
-    } else if (format.contains("orc")) {
-      return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
-    } else {
-      throw new UnsupportedOperationException("Unknown partition format: " + format);
-    }
-  }
-
-  private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
-                                                  PartitionSpec spec, Configuration conf) {
-    try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics = new Metrics(-1L, null, null, null);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("avro")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files in partition: %s", partitionUri);
-    }
-  }
-
-  private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
-                                                     PartitionSpec spec, Configuration conf,
-                                                     MetricsConfig metricsSpec, NameMapping mapping) {
-    try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
-
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics;
-            try {
-              ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
-              metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
-            } catch (IOException e) {
-              throw SparkExceptionUtil.toUncheckedException(
-                  e, "Unable to read the footer of the parquet file: %s", stat.getPath());
-            }
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("parquet")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files in partition: %s", partitionUri);
-    }
-  }
-
-  private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
-                                                 PartitionSpec spec, Configuration conf,
-                                                 MetricsConfig metricsSpec, NameMapping mapping) {
-    try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
-
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
-                metricsSpec, mapping);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("orc")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files in partition: %s", partitionUri);
-    }
-  }
 
   private static SparkPartition toSparkPartition(CatalogTablePartition partition, CatalogTable table) {
     Option<URI> locationUri = partition.storage().locationUri();
@@ -572,7 +422,7 @@ public class SparkTableUtil {
       String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
       NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
 
-      List<DataFile> files = listPartition(
+      List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
       AppendFiles append = targetTable.newAppend();