You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/05/13 20:10:02 UTC

[iceberg] branch master updated: Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil (#4758)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 68f5529ae Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil (#4758)
68f5529ae is described below

commit 68f5529aea6a61226700e200cc4e8c14cf516959
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri May 13 13:09:58 2022 -0700

    Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil (#4758)
---
 .../java/org/apache/iceberg/spark/Spark3Util.java  | 20 --------
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 57 ++++++++--------------
 2 files changed, 20 insertions(+), 57 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 3d0b6a6a8..f84e1ae10 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -28,8 +28,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -60,8 +58,6 @@ import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.Pair;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.CatalystTypeConverters;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -81,13 +77,11 @@ import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.execution.datasources.FileStatusCache;
 import org.apache.spark.sql.execution.datasources.InMemoryFileIndex;
 import org.apache.spark.sql.execution.datasources.PartitionDirectory;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 import org.apache.spark.sql.types.IntegerType;
 import org.apache.spark.sql.types.LongType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import scala.Option;
-import scala.Some;
 import scala.collection.JavaConverters;
 import scala.collection.immutable.Seq;
 
@@ -617,20 +611,6 @@ public class Spark3Util {
     }
   }
 
-  /**
-   * Returns a metadata table as a Dataset based on the given Iceberg table.
-   *
-   * @param spark SparkSession where the Dataset will be created
-   * @param table an Iceberg table
-   * @param type the type of metadata table
-   * @return a Dataset that will read the metadata table
-   */
-  private static Dataset<Row> loadMetadataTable(SparkSession spark, org.apache.iceberg.Table table,
-                                                MetadataTableType type) {
-    Table metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
-    return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty()));
-  }
-
   /**
    * Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog},
    * the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one.
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index e6c90cbcc..b2dbac381 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -37,11 +37,11 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.TableMigrationUtil;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
@@ -55,8 +55,10 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Objects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.TaskContext;
@@ -67,7 +69,6 @@ import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -83,6 +84,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
 import org.apache.spark.sql.catalyst.expressions.NamedExpression;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function2;
@@ -599,46 +602,26 @@ public class SparkTableUtil {
         .run(item -> io.deleteFile(item.path()));
   }
 
-  // Attempt to use Spark3 Catalog resolution if available on the path
-  private static final DynMethods.UnboundMethod LOAD_METADATA_TABLE = DynMethods.builder("loadMetadataTable")
-      .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, Table.class, MetadataTableType.class)
-      .orNoop()
-      .build();
-
+  /**
+   * Loads a metadata table.
+   *
+   * @deprecated since 0.14.0, will be removed in 0.15.0;
+   *             use {@link #loadMetadataTable(SparkSession, Table, MetadataTableType)}.
+   */
+  @Deprecated
   public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
-    Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use");
-    return LOAD_METADATA_TABLE.asStatic().invoke(spark, table, type);
+    return loadMetadataTable(spark, table, type);
   }
 
   public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
-    if (spark.version().startsWith("3")) {
-      // construct the metadata table instance directly
-      Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(spark, table, type);
-      if (catalogMetadataTable != null) {
-        return catalogMetadataTable;
-      }
-    }
-
-    String tableName = table.name();
-    String tableLocation = table.location();
-
-    DataFrameReader dataFrameReader = spark.read().format("iceberg");
-    if (tableName.contains("/")) {
-      // Hadoop Table or Metadata location passed, load without a catalog
-      return dataFrameReader.load(tableName + "#" + type);
-    }
+    return loadMetadataTable(spark, table, type, ImmutableMap.of());
+  }
 
-    // Catalog based resolution failed, our catalog may be a non-DatasourceV2 Catalog
-    if (tableName.startsWith("hadoop.")) {
-      // Try loading by location as Hadoop table without Catalog
-      return dataFrameReader.load(tableLocation + "#" + type);
-    } else if (tableName.startsWith("hive")) {
-      // Try loading by name as a Hive table without Catalog
-      return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "." + type);
-    } else {
-      throw new IllegalArgumentException(String.format(
-          "Cannot find the metadata table for %s of type %s", tableName, type));
-    }
+  public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type,
+                                               Map<String, String> extraOptions) {
+    SparkTable metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
+    CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(extraOptions);
+    return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
   }
 
   /**