You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/04/27 14:54:52 UTC
[iceberg] branch master updated: Core: Extract listPartition
methods from SparkTableUtil into DataUtil class in core module. (#2494)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b3fc299 Core: Extract listPartition methods from SparkTableUtil into DataUtil class in core module. (#2494)
b3fc299 is described below
commit b3fc2999601387481a3cea4737129527b1304c3b
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Tue Apr 27 16:54:27 2021 +0200
Core: Extract listPartition methods from SparkTableUtil into DataUtil class in core module. (#2494)
Extract listPartition methods from SparkTableUtil into TableMigrationUtil class in core module for use with
other frameworks.
---
.../apache/iceberg/data/TableMigrationUtil.java | 180 +++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 170 ++-----------------
2 files changed, 190 insertions(+), 160 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
new file mode 100644
index 0000000..2750847
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+public class TableMigrationUtil {
+
+ private static final PathFilter HIDDEN_PATH_FILTER =
+ p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
+
+ private TableMigrationUtil() {
+ }
+
+ /**
+ * Returns the data files in a partition by listing the partition location.
+ * <p>
+ * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
+ * metrics are set to null.
+ * <p>
+ * Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but not file footers,
+ * will not be populated.
+ *
+ * @param partition partition key, e.g., "a=1/b=2"
+ * @param uri partition location URI
+ * @param format partition format, avro, parquet or orc
+ * @param spec a partition spec
+ * @param conf a Hadoop conf
+ * @param metricsConfig a metrics conf
+ * @param mapping a name mapping
+ * @return a List of DataFile
+ */
+ public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
+ PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
+ NameMapping mapping) {
+ if (format.contains("avro")) {
+ return listAvroPartition(partition, uri, spec, conf);
+ } else if (format.contains("parquet")) {
+ return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
+ } else if (format.contains("orc")) {
+ return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format: " + format);
+ }
+ }
+
+ private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
+ PartitionSpec spec, Configuration conf) {
+ try {
+ Path partition = new Path(partitionUri);
+ FileSystem fs = partition.getFileSystem(conf);
+ return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .map(stat -> {
+ Metrics metrics = new Metrics(-1L, null, null, null);
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat("avro")
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+ }
+ }
+
+ private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
+ PartitionSpec spec, Configuration conf,
+ MetricsConfig metricsSpec, NameMapping mapping) {
+ try {
+ Path partition = new Path(partitionUri);
+ FileSystem fs = partition.getFileSystem(conf);
+
+ return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .map(stat -> {
+ Metrics metrics;
+ try {
+ ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
+ metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read the footer of the parquet file: " +
+ stat.getPath(), e);
+ }
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat("parquet")
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+ }
+ }
+
+ private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
+ PartitionSpec spec, Configuration conf,
+ MetricsConfig metricsSpec, NameMapping mapping) {
+ try {
+ Path partition = new Path(partitionUri);
+ FileSystem fs = partition.getFileSystem(conf);
+
+ return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .map(stat -> {
+ Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
+ metricsSpec, mapping);
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat("orc")
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index d03f96c..c9bfd49 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -22,41 +22,32 @@ package org.apache.iceberg.spark;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
-import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.OrcMetrics;
-import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
@@ -65,8 +56,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -255,12 +244,15 @@ public class SparkTableUtil {
*
* For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
* metrics are set to null.
+ * @deprecated use {@link TableMigrationUtil#listPartition(Map, String, String, PartitionSpec, Configuration,
+ * MetricsConfig, NameMapping)}
*
* @param partition a partition
* @param conf a serializable Hadoop conf
* @param metricsConfig a metrics conf
* @return a List of DataFile
*/
+ @Deprecated
public static List<DataFile> listPartition(SparkPartition partition, PartitionSpec spec,
SerializableConfiguration conf, MetricsConfig metricsConfig) {
return listPartition(partition, spec, conf, metricsConfig, null);
@@ -271,6 +263,8 @@ public class SparkTableUtil {
*
* For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
* metrics are set to null.
+ * @deprecated use {@link TableMigrationUtil#listPartition(Map, String, String, PartitionSpec, Configuration,
+ * MetricsConfig, NameMapping)}
*
* @param partition a partition
* @param conf a serializable Hadoop conf
@@ -278,158 +272,14 @@ public class SparkTableUtil {
* @param mapping a name mapping
* @return a List of DataFile
*/
+ @Deprecated
public static List<DataFile> listPartition(SparkPartition partition, PartitionSpec spec,
SerializableConfiguration conf, MetricsConfig metricsConfig,
NameMapping mapping) {
- return listPartition(partition.values, partition.uri, partition.format, spec, conf.get(), metricsConfig, mapping);
+ return TableMigrationUtil.listPartition(partition.values, partition.uri, partition.format, spec, conf.get(),
+ metricsConfig, mapping);
}
- /**
- * Returns the data files in a partition by listing the partition location.
- *
- * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
- * metrics are set to null.
- *
- * @param partition partition key, e.g., "a=1/b=2"
- * @param uri partition location URI
- * @param format partition format, avro or parquet
- * @param spec a partition spec
- * @param conf a Hadoop conf
- * @param metricsConfig a metrics conf
- * @return a List of DataFile
- */
- public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
- PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig) {
- return listPartition(partition, uri, format, spec, conf, metricsConfig, null);
- }
-
- /**
- * Returns the data files in a partition by listing the partition location.
- * <p>
- * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
- * metrics are set to null.
- * <p>
- * Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but not file footers, will
- * not be populated.
- *
- * @param partition partition key, e.g., "a=1/b=2"
- * @param uri partition location URI
- * @param format partition format, avro or parquet
- * @param spec a partition spec
- * @param conf a Hadoop conf
- * @param metricsConfig a metrics conf
- * @param mapping a name mapping
- * @return a List of DataFile
- */
- public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
- PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
- NameMapping mapping) {
- if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
- } else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
- } else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
- } else {
- throw new UnsupportedOperationException("Unknown partition format: " + format);
- }
- }
-
- private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
- PartitionSpec spec, Configuration conf) {
- try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics = new Metrics(-1L, null, null, null);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files in partition: %s", partitionUri);
- }
- }
-
- private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
- PartitionSpec spec, Configuration conf,
- MetricsConfig metricsSpec, NameMapping mapping) {
- try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics;
- try {
- ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
- metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
- } catch (IOException e) {
- throw SparkExceptionUtil.toUncheckedException(
- e, "Unable to read the footer of the parquet file: %s", stat.getPath());
- }
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("parquet")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files in partition: %s", partitionUri);
- }
- }
-
- private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
- PartitionSpec spec, Configuration conf,
- MetricsConfig metricsSpec, NameMapping mapping) {
- try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
- metricsSpec, mapping);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("orc")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files in partition: %s", partitionUri);
- }
- }
private static SparkPartition toSparkPartition(CatalogTablePartition partition, CatalogTable table) {
Option<URI> locationUri = partition.storage().locationUri();
@@ -572,7 +422,7 @@ public class SparkTableUtil {
String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
- List<DataFile> files = listPartition(
+ List<DataFile> files = TableMigrationUtil.listPartition(
partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
AppendFiles append = targetTable.newAppend();