You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/02/19 17:17:35 UTC

[incubator-iceberg] branch master updated: Use DataFile instead of SparkDataFile in SparkTableUtil (#786)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d96944  Use DataFile instead of SparkDataFile in SparkTableUtil (#786)
4d96944 is described below

commit 4d96944679a8ab9aa028299477bbf085621d55b8
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Thu Feb 20 01:17:27 2020 +0800

    Use DataFile instead of SparkDataFile in SparkTableUtil (#786)
    
    
    Fixes #763
---
 .../org/apache/iceberg/spark/SparkTableUtil.scala  | 220 ++++++---------------
 1 file changed, 60 insertions(+), 160 deletions(-)

diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index c14bb6a..eaf405a 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -20,8 +20,6 @@
 package org.apache.iceberg.spark
 
 import com.google.common.collect.Maps
-import java.nio.ByteBuffer
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestWriter}
@@ -153,14 +151,15 @@ object SparkTableUtil {
    * @param partition a partition
    * @param conf a serializable Hadoop conf
    * @param metricsConfig a metrics conf
-   * @return a Seq of [[SparkDataFile]]
+   * @return a Seq of [[DataFile]]
    */
   def listPartition(
       partition: SparkPartition,
+      spec: PartitionSpec,
       conf: SerializableConfiguration,
-      metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
+      metricsConfig: MetricsConfig): Seq[DataFile] = {
 
-    listPartition(partition.values, partition.uri, partition.format, conf.get(), metricsConfig)
+    listPartition(partition.values, partition.uri, partition.format, spec, conf.get(), metricsConfig)
   }
 
   /**
@@ -174,22 +173,23 @@ object SparkTableUtil {
    * @param format partition format, avro or parquet
    * @param conf a Hadoop conf
    * @param metricsConfig a metrics conf
-   * @return a seq of [[SparkDataFile]]
+   * @return a seq of [[DataFile]]
    */
   def listPartition(
       partition: Map[String, String],
       uri: String,
       format: String,
+      spec: PartitionSpec,
       conf: Configuration = new Configuration(),
-      metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
+      metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[DataFile] = {
 
     if (format.contains("avro")) {
-      listAvroPartition(partition, uri, conf)
+      listAvroPartition(partition, uri, spec, conf)
     } else if (format.contains("parquet")) {
-      listParquetPartition(partition, uri, conf, metricsConfig)
+      listParquetPartition(partition, uri, spec, conf, metricsConfig)
     } else if (format.contains("orc")) {
       // TODO: use MetricsConfig in listOrcPartition
-      listOrcPartition(partition, uri, conf)
+      listOrcPartition(partition, uri, spec, conf)
     } else {
       throw new UnsupportedOperationException(s"Unknown partition format: $format")
     }
@@ -200,113 +200,6 @@ object SparkTableUtil {
    */
   case class SparkPartition(values: Map[String, String], uri: String, format: String)
 
-  /**
-   * Case class representing a data file.
-   */
-  case class SparkDataFile(
-      path: String,
-      partition: collection.Map[String, String],
-      format: String,
-      fileSize: Long,
-      rowGroupSize: Long,
-      rowCount: Long,
-      columnSizes: Array[Long],
-      valueCounts: Array[Long],
-      nullValueCounts: Array[Long],
-      lowerBounds: Seq[Array[Byte]],
-      upperBounds: Seq[Array[Byte]]
-    ) {
-
-    /**
-     * Convert this to a [[DataFile]] that can be added to a [[org.apache.iceberg.Table]].
-     *
-     * @param spec a [[PartitionSpec]] that will be used to parse the partition key
-     * @return a [[DataFile]] that can be passed to [[org.apache.iceberg.AppendFiles]]
-     */
-    def toDataFile(spec: PartitionSpec): DataFile = {
-      // values are strings, so pass a path to let the builder coerce to the right types
-      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
-        s"$name=${partition(name)}"
-      }.mkString("/")
-
-      DataFiles.builder(spec)
-        .withPath(path)
-        .withFormat(format)
-        .withFileSizeInBytes(fileSize)
-        .withMetrics(new Metrics(rowCount,
-          arrayToMap(columnSizes),
-          arrayToMap(valueCounts),
-          arrayToMap(nullValueCounts),
-          arrayToMap(lowerBounds),
-          arrayToMap(upperBounds)))
-        .withPartitionPath(partitionKey)
-        .build()
-    }
-  }
-
-  private def bytesMapToArray(map: java.util.Map[Integer, ByteBuffer]): Seq[Array[Byte]] = {
-    if (map != null && !map.isEmpty) {
-      val keys = map.keySet.asScala
-      val max = keys.max
-      val arr = Array.fill(max + 1)(null.asInstanceOf[Array[Byte]])
-
-      keys.foreach { key =>
-        val buffer = map.get(key)
-
-        val copy = if (buffer.hasArray) {
-          val bytes = buffer.array()
-          if (buffer.arrayOffset() == 0 && buffer.position() == 0 &&
-              bytes.length == buffer.remaining()) {
-            bytes
-          } else {
-            val start = buffer.arrayOffset() + buffer.position()
-            val end = start + buffer.remaining()
-            util.Arrays.copyOfRange(bytes, start, end);
-          }
-        } else {
-          val bytes = Array.fill(buffer.remaining())(0.asInstanceOf[Byte])
-          buffer.get(bytes)
-          bytes
-        }
-
-        arr.update(key, copy)
-      }
-
-      arr
-    } else {
-      null
-    }
-  }
-
-  private def mapToArray(map: java.util.Map[Integer, java.lang.Long]): Array[Long] = {
-    if (map != null && !map.isEmpty) {
-      val keys = map.keySet.asScala
-      val max = keys.max
-      val arr = Array.fill(max + 1)(-1L)
-
-      keys.foreach { key =>
-        arr.update(key, map.get(key))
-      }
-
-      arr
-    } else {
-      null
-    }
-  }
-
-  private def arrayToMap(arr: Seq[Array[Byte]]): java.util.Map[Integer, ByteBuffer] = {
-    if (arr != null) {
-      val map: java.util.Map[Integer, ByteBuffer] = Maps.newHashMap()
-      arr.zipWithIndex.foreach {
-        case (null, _) => // skip
-        case (value, index) => map.put(index, ByteBuffer.wrap(value))
-      }
-      map
-    } else {
-      null
-    }
-  }
-
   private def arrayToMap(arr: Array[Long]): java.util.Map[Integer, java.lang.Long] = {
     if (arr != null) {
       val map: java.util.Map[Integer, java.lang.Long] = Maps.newHashMap()
@@ -329,21 +222,24 @@ object SparkTableUtil {
   private def listAvroPartition(
       partitionPath: Map[String, String],
       partitionUri: String,
-      conf: Configuration): Seq[SparkDataFile] = {
+      spec: PartitionSpec,
+      conf: Configuration): Seq[DataFile] = {
     val partition = new Path(partitionUri)
     val fs = partition.getFileSystem(conf)
 
     fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
-      SparkDataFile(
-        stat.getPath.toString,
-        partitionPath, "avro", stat.getLen,
-        stat.getBlockSize,
-        -1,
-        null,
-        null,
-        null,
-        null,
-        null)
+      val metrics = new Metrics(-1L, arrayToMap(null), arrayToMap(null), arrayToMap(null))
+      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
+        s"$name=${partitionPath(name)}"
+      }.mkString("/")
+
+      DataFiles.builder(spec)
+        .withPath(stat.getPath.toString)
+        .withFormat("avro")
+        .withFileSizeInBytes(stat.getLen)
+        .withMetrics(metrics)
+        .withPartitionPath(partitionKey)
+        .build()
     }
   }
 
@@ -351,48 +247,49 @@ object SparkTableUtil {
   private def listParquetPartition(
       partitionPath: Map[String, String],
       partitionUri: String,
+      spec: PartitionSpec,
       conf: Configuration,
-      metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
+      metricsSpec: MetricsConfig): Seq[DataFile] = {
     val partition = new Path(partitionUri)
     val fs = partition.getFileSystem(conf)
 
     fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
       val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec)
+      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
+        s"$name=${partitionPath(name)}"
+      }.mkString("/")
 
-      SparkDataFile(
-        stat.getPath.toString,
-        partitionPath, "parquet", stat.getLen,
-        stat.getBlockSize,
-        metrics.recordCount,
-        mapToArray(metrics.columnSizes),
-        mapToArray(metrics.valueCounts),
-        mapToArray(metrics.nullValueCounts),
-        bytesMapToArray(metrics.lowerBounds),
-        bytesMapToArray(metrics.upperBounds))
+      DataFiles.builder(spec)
+        .withPath(stat.getPath.toString)
+        .withFormat("parquet")
+        .withFileSizeInBytes(stat.getLen)
+        .withMetrics(metrics)
+        .withPartitionPath(partitionKey)
+        .build()
     }
   }
 
   private def listOrcPartition(
       partitionPath: Map[String, String],
       partitionUri: String,
-      conf: Configuration): Seq[SparkDataFile] = {
+      spec: PartitionSpec,
+      conf: Configuration): Seq[DataFile] = {
     val partition = new Path(partitionUri)
     val fs = partition.getFileSystem(conf)
 
     fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
       val metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
+      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
+        s"$name=${partitionPath(name)}"
+      }.mkString("/")
 
-      SparkDataFile(
-        stat.getPath.toString,
-        partitionPath, "orc", stat.getLen,
-        stat.getBlockSize,
-        metrics.recordCount,
-        mapToArray(metrics.columnSizes),
-        mapToArray(metrics.valueCounts),
-        mapToArray(metrics.nullValueCounts),
-        bytesMapToArray(metrics.lowerBounds()),
-        bytesMapToArray(metrics.upperBounds())
-      )
+      DataFiles.builder(spec)
+        .withPath(stat.getPath.toString)
+        .withFormat("orc")
+        .withFileSizeInBytes(stat.getLen)
+        .withMetrics(metrics)
+        .withPartitionPath(partitionKey)
+        .build()
     }
   }
 
@@ -421,7 +318,7 @@ object SparkTableUtil {
   private def buildManifest(
       conf: SerializableConfiguration,
       spec: PartitionSpec,
-      basePath: String): Iterator[SparkDataFile] => Iterator[ManifestFile] = { files =>
+      basePath: String): Iterator[DataFile] => Iterator[ManifestFile] = { files =>
     if (files.hasNext) {
       val io = new HadoopFileIO(conf.get())
       val ctx = TaskContext.get()
@@ -429,9 +326,7 @@ object SparkTableUtil {
       val outputFile = io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
       val writer = ManifestWriter.write(spec, outputFile)
       try {
-        files.foreach { file =>
-          writer.add(file.toDataFile(spec))
-        }
+        files.foreach(writer.add)
       } finally {
         writer.close()
       }
@@ -489,13 +384,15 @@ object SparkTableUtil {
     val format = sourceTable.storage.serde.orElse(sourceTable.provider)
     require(format.nonEmpty, "Could not determine table format")
 
+    val partition = Map.empty[String, String]
+    val spec = PartitionSpec.unpartitioned()
     val conf = spark.sessionState.newHadoopConf()
     val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
 
-    val files = listPartition(Map.empty, sourceTable.location.toString, format.get, conf, metricsConfig)
+    val files = listPartition(partition, sourceTable.location.toString, format.get, spec, conf, metricsConfig)
 
     val append = targetTable.newAppend()
-    files.foreach(file => append.appendFile(file.toDataFile(PartitionSpec.unpartitioned)))
+    files.foreach(append.appendFile)
     append.commit()
   }
 
@@ -516,6 +413,8 @@ object SparkTableUtil {
       stagingDir: String): Unit = {
 
     implicit val manifestFileEncoder: Encoder[ManifestFile] = Encoders.javaSerialization[ManifestFile]
+    implicit val dataFileEncoder: Encoder[DataFile] = Encoders.javaSerialization[DataFile]
+    implicit val pathDataFileEncoder: Encoder[(String, DataFile)] = Encoders.tuple(Encoders.STRING, dataFileEncoder)
 
     import spark.implicits._
 
@@ -527,10 +426,11 @@ object SparkTableUtil {
     val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
 
     val manifests = partitionDS
-      .flatMap(partition => listPartition(partition, serializableConf, metricsConfig))
+      .flatMap(partition => listPartition(partition, spec, serializableConf, metricsConfig))
       .repartition(numShufflePartitions)
-      .orderBy($"path")
-      .mapPartitions(buildManifest(serializableConf, spec, stagingDir))
+      .map(file => (file.path.toString, file))
+      .orderBy($"_1")
+      .mapPartitions(files => buildManifest(serializableConf, spec, stagingDir)(files.map(_._2)))
       .collect()
 
     try {