You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/05/13 20:10:02 UTC
[iceberg] branch master updated: Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil (#4758)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 68f5529ae Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil (#4758)
68f5529ae is described below
commit 68f5529aea6a61226700e200cc4e8c14cf516959
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri May 13 13:09:58 2022 -0700
Spark 3.2: Avoid reflection to load metadata tables in SparkTableUtil (#4758)
---
.../java/org/apache/iceberg/spark/Spark3Util.java | 20 --------
.../org/apache/iceberg/spark/SparkTableUtil.java | 57 ++++++++--------------
2 files changed, 20 insertions(+), 57 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index 3d0b6a6a8..f84e1ae10 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -28,8 +28,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -60,8 +58,6 @@ import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -81,13 +77,11 @@ import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.execution.datasources.FileStatusCache;
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex;
import org.apache.spark.sql.execution.datasources.PartitionDirectory;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.Option;
-import scala.Some;
import scala.collection.JavaConverters;
import scala.collection.immutable.Seq;
@@ -617,20 +611,6 @@ public class Spark3Util {
}
}
- /**
- * Returns a metadata table as a Dataset based on the given Iceberg table.
- *
- * @param spark SparkSession where the Dataset will be created
- * @param table an Iceberg table
- * @param type the type of metadata table
- * @return a Dataset that will read the metadata table
- */
- private static Dataset<Row> loadMetadataTable(SparkSession spark, org.apache.iceberg.Table table,
- MetadataTableType type) {
- Table metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
- return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty()));
- }
-
/**
* Returns an Iceberg Table by its name from a Spark V2 Catalog. If cache is enabled in {@link SparkCatalog},
* the {@link TableOperations} of the table may be stale, please refresh the table to get the latest one.
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index e6c90cbcc..b2dbac381 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -37,11 +37,11 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
@@ -55,8 +55,10 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.TaskContext;
@@ -67,7 +69,6 @@ import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -83,6 +84,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function2;
@@ -599,46 +602,26 @@ public class SparkTableUtil {
.run(item -> io.deleteFile(item.path()));
}
- // Attempt to use Spark3 Catalog resolution if available on the path
- private static final DynMethods.UnboundMethod LOAD_METADATA_TABLE = DynMethods.builder("loadMetadataTable")
- .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, Table.class, MetadataTableType.class)
- .orNoop()
- .build();
-
+ /**
+ * Loads a metadata table.
+ *
+ * @deprecated since 0.14.0, will be removed in 0.15.0;
+ * use {@link #loadMetadataTable(SparkSession, Table, MetadataTableType)}.
+ */
+ @Deprecated
public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
- Preconditions.checkArgument(!LOAD_METADATA_TABLE.isNoop(), "Cannot find Spark3Util class but Spark3 is in use");
- return LOAD_METADATA_TABLE.asStatic().invoke(spark, table, type);
+ return loadMetadataTable(spark, table, type);
}
public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type) {
- if (spark.version().startsWith("3")) {
- // construct the metadata table instance directly
- Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(spark, table, type);
- if (catalogMetadataTable != null) {
- return catalogMetadataTable;
- }
- }
-
- String tableName = table.name();
- String tableLocation = table.location();
-
- DataFrameReader dataFrameReader = spark.read().format("iceberg");
- if (tableName.contains("/")) {
- // Hadoop Table or Metadata location passed, load without a catalog
- return dataFrameReader.load(tableName + "#" + type);
- }
+ return loadMetadataTable(spark, table, type, ImmutableMap.of());
+ }
- // Catalog based resolution failed, our catalog may be a non-DatasourceV2 Catalog
- if (tableName.startsWith("hadoop.")) {
- // Try loading by location as Hadoop table without Catalog
- return dataFrameReader.load(tableLocation + "#" + type);
- } else if (tableName.startsWith("hive")) {
- // Try loading by name as a Hive table without Catalog
- return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "." + type);
- } else {
- throw new IllegalArgumentException(String.format(
- "Cannot find the metadata table for %s of type %s", tableName, type));
- }
+ public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, MetadataTableType type,
+ Map<String, String> extraOptions) {
+ SparkTable metadataTable = new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, type), false);
+ CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(extraOptions);
+ return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
}
/**