You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2019/11/09 01:37:00 UTC

[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #624: Update SparkTableUtil to use SessionCatalog and proper MetricsConfig

chenjunjiedada commented on a change in pull request #624: Update SparkTableUtil to use SessionCatalog and proper MetricsConfig
URL: https://github.com/apache/incubator-iceberg/pull/624#discussion_r344421841
 
 

 ##########
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##########
 @@ -27,86 +27,153 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestWriter}
 import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table}
 import org.apache.iceberg.exceptions.NoSuchTableException
-import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile, HadoopTables, SerializableConfiguration}
+import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile, SerializableConfiguration}
 import org.apache.iceberg.orc.OrcMetrics
 import org.apache.iceberg.parquet.ParquetUtil
-import org.apache.iceberg.spark.hacks.Hive
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 object SparkTableUtil {
   /**
-   * Returns a DataFrame with a row for each partition in the table.
-   *
-   * The DataFrame has 3 columns, partition key (a=1/b=2), partition location, and format
-   * (avro or parquet).
+   * Returns all partitions in the table.
    *
    * @param spark a Spark session
    * @param table a table name and (optional) database
-   * @return a DataFrame of the table's partitions
+   * @return all table's partitions
    */
-  def partitionDF(spark: SparkSession, table: String): DataFrame = {
-    import spark.implicits._
+  def getPartitions(spark: SparkSession, table: String): Seq[SparkPartition] = {
+    val tableIdentifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
+    getPartitions(spark, tableIdentifier)
+  }
 
-    val partitions: Seq[(Map[String, String], Option[String], Option[String])] =
-      Hive.partitions(spark, table).map { p: CatalogTablePartition =>
-        (p.spec, p.storage.locationUri.map(String.valueOf(_)), p.storage.serde)
-      }
+  /**
+   * Returns all partitions in the table.
+   *
+   * @param spark a Spark session
+   * @param tableIdent a table identifier
+   * @return all table's partitions
+   */
+  def getPartitions(spark: SparkSession, tableIdent: TableIdentifier): Seq[SparkPartition] = {
+    val catalog = spark.sessionState.catalog
+    val catalogTable = catalog.getTableMetadata(tableIdent)
 
-    partitions.toDF("partition", "uri", "format")
+    catalog
+      .listPartitions(tableIdent)
+      .map(catalogPartition => toSparkPartition(catalogPartition, catalogTable))
   }
 
   /**
-    * Returns a DataFrame with a row for each partition that matches the specified 'expression'.
-    *
-    * @param spark a Spark session.
-    * @param table name of the table.
-    * @param expression The expression whose matching partitions are returned.
-    * @return a DataFrame of the table partitions.
-    */
-  def partitionDFByFilter(spark: SparkSession, table: String, expression: String): DataFrame = {
-    import spark.implicits._
+   * Returns partitions that match the specified 'predicate'.
+   *
+   * @param spark a Spark session
+   * @param table a table name and (optional) database
+   * @param predicate a predicate on partition columns
+   * @return matching table's partitions
+   */
+  def getPartitionsByFilter(spark: SparkSession, table: String, predicate: String): Seq[SparkPartition] = {
+    val tableIdentifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
+
+    val unresolvedPredicateExpr = spark.sessionState.sqlParser.parseExpression(predicate)
+    val resolver = spark.sessionState.analyzer.resolver
+    val plan = spark.table(table).queryExecution.analyzed
+    val resolvedPredicateExpr = unresolvedPredicateExpr.transform {
+      case attr: UnresolvedAttribute =>
+        plan.resolve(attr.nameParts, resolver) match {
+          case Some(resolvedAttr) => resolvedAttr
+          case None => throw new IllegalArgumentException(s"Could not resolve $attr using columns: ${plan.output}")
+        }
+    }
 
-    val expr = spark.sessionState.sqlParser.parseExpression(expression)
-    val partitions: Seq[(Map[String, String], Option[String], Option[String])] =
-      Hive.partitionsByFilter(spark, table, expr).map { p: CatalogTablePartition =>
-        (p.spec, p.storage.locationUri.map(String.valueOf(_)), p.storage.serde)
-      }
+    getPartitionsByFilter(spark, tableIdentifier, resolvedPredicateExpr)
+  }
+
+  /**
+   * Returns partitions that match the specified 'predicate'.
+   *
+   * @param spark a Spark session
+   * @param tableIdent a table identifier
+   * @param predicate a predicate on partition columns
+   * @return matching table's partitions
+   */
+  def getPartitionsByFilter(
+      spark: SparkSession,
+      tableIdent: TableIdentifier,
+      predicate: Expression): Seq[SparkPartition] = {
+
+    if (!predicate.resolved) {
+      throw new IllegalArgumentException(s"$predicate is not resolved")
 
 Review comment:
   Why not put the resolving logic in this function?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org