You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/02/19 17:17:35 UTC
[incubator-iceberg] branch master updated: Use DataFile instead of
SparkDataFile in SparkTableUtil (#786)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4d96944 Use DataFile instead of SparkDataFile in SparkTableUtil (#786)
4d96944 is described below
commit 4d96944679a8ab9aa028299477bbf085621d55b8
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Thu Feb 20 01:17:27 2020 +0800
Use DataFile instead of SparkDataFile in SparkTableUtil (#786)
Fixes #763
---
.../org/apache/iceberg/spark/SparkTableUtil.scala | 220 ++++++---------------
1 file changed, 60 insertions(+), 160 deletions(-)
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index c14bb6a..eaf405a 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -20,8 +20,6 @@
package org.apache.iceberg.spark
import com.google.common.collect.Maps
-import java.nio.ByteBuffer
-import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestWriter}
@@ -153,14 +151,15 @@ object SparkTableUtil {
* @param partition a partition
* @param conf a serializable Hadoop conf
* @param metricsConfig a metrics conf
- * @return a Seq of [[SparkDataFile]]
+ * @return a Seq of [[DataFile]]
*/
def listPartition(
partition: SparkPartition,
+ spec: PartitionSpec,
conf: SerializableConfiguration,
- metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
+ metricsConfig: MetricsConfig): Seq[DataFile] = {
- listPartition(partition.values, partition.uri, partition.format, conf.get(), metricsConfig)
+ listPartition(partition.values, partition.uri, partition.format, spec, conf.get(), metricsConfig)
}
/**
@@ -174,22 +173,23 @@ object SparkTableUtil {
* @param format partition format, avro or parquet
* @param conf a Hadoop conf
* @param metricsConfig a metrics conf
- * @return a seq of [[SparkDataFile]]
+ * @return a seq of [[DataFile]]
*/
def listPartition(
partition: Map[String, String],
uri: String,
format: String,
+ spec: PartitionSpec,
conf: Configuration = new Configuration(),
- metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[SparkDataFile] = {
+ metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[DataFile] = {
if (format.contains("avro")) {
- listAvroPartition(partition, uri, conf)
+ listAvroPartition(partition, uri, spec, conf)
} else if (format.contains("parquet")) {
- listParquetPartition(partition, uri, conf, metricsConfig)
+ listParquetPartition(partition, uri, spec, conf, metricsConfig)
} else if (format.contains("orc")) {
// TODO: use MetricsConfig in listOrcPartition
- listOrcPartition(partition, uri, conf)
+ listOrcPartition(partition, uri, spec, conf)
} else {
throw new UnsupportedOperationException(s"Unknown partition format: $format")
}
@@ -200,113 +200,6 @@ object SparkTableUtil {
*/
case class SparkPartition(values: Map[String, String], uri: String, format: String)
- /**
- * Case class representing a data file.
- */
- case class SparkDataFile(
- path: String,
- partition: collection.Map[String, String],
- format: String,
- fileSize: Long,
- rowGroupSize: Long,
- rowCount: Long,
- columnSizes: Array[Long],
- valueCounts: Array[Long],
- nullValueCounts: Array[Long],
- lowerBounds: Seq[Array[Byte]],
- upperBounds: Seq[Array[Byte]]
- ) {
-
- /**
- * Convert this to a [[DataFile]] that can be added to a [[org.apache.iceberg.Table]].
- *
- * @param spec a [[PartitionSpec]] that will be used to parse the partition key
- * @return a [[DataFile]] that can be passed to [[org.apache.iceberg.AppendFiles]]
- */
- def toDataFile(spec: PartitionSpec): DataFile = {
- // values are strings, so pass a path to let the builder coerce to the right types
- val partitionKey = spec.fields.asScala.map(_.name).map { name =>
- s"$name=${partition(name)}"
- }.mkString("/")
-
- DataFiles.builder(spec)
- .withPath(path)
- .withFormat(format)
- .withFileSizeInBytes(fileSize)
- .withMetrics(new Metrics(rowCount,
- arrayToMap(columnSizes),
- arrayToMap(valueCounts),
- arrayToMap(nullValueCounts),
- arrayToMap(lowerBounds),
- arrayToMap(upperBounds)))
- .withPartitionPath(partitionKey)
- .build()
- }
- }
-
- private def bytesMapToArray(map: java.util.Map[Integer, ByteBuffer]): Seq[Array[Byte]] = {
- if (map != null && !map.isEmpty) {
- val keys = map.keySet.asScala
- val max = keys.max
- val arr = Array.fill(max + 1)(null.asInstanceOf[Array[Byte]])
-
- keys.foreach { key =>
- val buffer = map.get(key)
-
- val copy = if (buffer.hasArray) {
- val bytes = buffer.array()
- if (buffer.arrayOffset() == 0 && buffer.position() == 0 &&
- bytes.length == buffer.remaining()) {
- bytes
- } else {
- val start = buffer.arrayOffset() + buffer.position()
- val end = start + buffer.remaining()
- util.Arrays.copyOfRange(bytes, start, end);
- }
- } else {
- val bytes = Array.fill(buffer.remaining())(0.asInstanceOf[Byte])
- buffer.get(bytes)
- bytes
- }
-
- arr.update(key, copy)
- }
-
- arr
- } else {
- null
- }
- }
-
- private def mapToArray(map: java.util.Map[Integer, java.lang.Long]): Array[Long] = {
- if (map != null && !map.isEmpty) {
- val keys = map.keySet.asScala
- val max = keys.max
- val arr = Array.fill(max + 1)(-1L)
-
- keys.foreach { key =>
- arr.update(key, map.get(key))
- }
-
- arr
- } else {
- null
- }
- }
-
- private def arrayToMap(arr: Seq[Array[Byte]]): java.util.Map[Integer, ByteBuffer] = {
- if (arr != null) {
- val map: java.util.Map[Integer, ByteBuffer] = Maps.newHashMap()
- arr.zipWithIndex.foreach {
- case (null, _) => // skip
- case (value, index) => map.put(index, ByteBuffer.wrap(value))
- }
- map
- } else {
- null
- }
- }
-
private def arrayToMap(arr: Array[Long]): java.util.Map[Integer, java.lang.Long] = {
if (arr != null) {
val map: java.util.Map[Integer, java.lang.Long] = Maps.newHashMap()
@@ -329,21 +222,24 @@ object SparkTableUtil {
private def listAvroPartition(
partitionPath: Map[String, String],
partitionUri: String,
- conf: Configuration): Seq[SparkDataFile] = {
+ spec: PartitionSpec,
+ conf: Configuration): Seq[DataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)
fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
- SparkDataFile(
- stat.getPath.toString,
- partitionPath, "avro", stat.getLen,
- stat.getBlockSize,
- -1,
- null,
- null,
- null,
- null,
- null)
+ val metrics = new Metrics(-1L, arrayToMap(null), arrayToMap(null), arrayToMap(null))
+ val partitionKey = spec.fields.asScala.map(_.name).map { name =>
+ s"$name=${partitionPath(name)}"
+ }.mkString("/")
+
+ DataFiles.builder(spec)
+ .withPath(stat.getPath.toString)
+ .withFormat("avro")
+ .withFileSizeInBytes(stat.getLen)
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build()
}
}
@@ -351,48 +247,49 @@ object SparkTableUtil {
private def listParquetPartition(
partitionPath: Map[String, String],
partitionUri: String,
+ spec: PartitionSpec,
conf: Configuration,
- metricsSpec: MetricsConfig): Seq[SparkDataFile] = {
+ metricsSpec: MetricsConfig): Seq[DataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)
fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec)
+ val partitionKey = spec.fields.asScala.map(_.name).map { name =>
+ s"$name=${partitionPath(name)}"
+ }.mkString("/")
- SparkDataFile(
- stat.getPath.toString,
- partitionPath, "parquet", stat.getLen,
- stat.getBlockSize,
- metrics.recordCount,
- mapToArray(metrics.columnSizes),
- mapToArray(metrics.valueCounts),
- mapToArray(metrics.nullValueCounts),
- bytesMapToArray(metrics.lowerBounds),
- bytesMapToArray(metrics.upperBounds))
+ DataFiles.builder(spec)
+ .withPath(stat.getPath.toString)
+ .withFormat("parquet")
+ .withFileSizeInBytes(stat.getLen)
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build()
}
}
private def listOrcPartition(
partitionPath: Map[String, String],
partitionUri: String,
- conf: Configuration): Seq[SparkDataFile] = {
+ spec: PartitionSpec,
+ conf: Configuration): Seq[DataFile] = {
val partition = new Path(partitionUri)
val fs = partition.getFileSystem(conf)
fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
val metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
+ val partitionKey = spec.fields.asScala.map(_.name).map { name =>
+ s"$name=${partitionPath(name)}"
+ }.mkString("/")
- SparkDataFile(
- stat.getPath.toString,
- partitionPath, "orc", stat.getLen,
- stat.getBlockSize,
- metrics.recordCount,
- mapToArray(metrics.columnSizes),
- mapToArray(metrics.valueCounts),
- mapToArray(metrics.nullValueCounts),
- bytesMapToArray(metrics.lowerBounds()),
- bytesMapToArray(metrics.upperBounds())
- )
+ DataFiles.builder(spec)
+ .withPath(stat.getPath.toString)
+ .withFormat("orc")
+ .withFileSizeInBytes(stat.getLen)
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build()
}
}
@@ -421,7 +318,7 @@ object SparkTableUtil {
private def buildManifest(
conf: SerializableConfiguration,
spec: PartitionSpec,
- basePath: String): Iterator[SparkDataFile] => Iterator[ManifestFile] = { files =>
+ basePath: String): Iterator[DataFile] => Iterator[ManifestFile] = { files =>
if (files.hasNext) {
val io = new HadoopFileIO(conf.get())
val ctx = TaskContext.get()
@@ -429,9 +326,7 @@ object SparkTableUtil {
val outputFile = io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
val writer = ManifestWriter.write(spec, outputFile)
try {
- files.foreach { file =>
- writer.add(file.toDataFile(spec))
- }
+ files.foreach(writer.add)
} finally {
writer.close()
}
@@ -489,13 +384,15 @@ object SparkTableUtil {
val format = sourceTable.storage.serde.orElse(sourceTable.provider)
require(format.nonEmpty, "Could not determine table format")
+ val partition = Map.empty[String, String]
+ val spec = PartitionSpec.unpartitioned()
val conf = spark.sessionState.newHadoopConf()
val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
- val files = listPartition(Map.empty, sourceTable.location.toString, format.get, conf, metricsConfig)
+ val files = listPartition(partition, sourceTable.location.toString, format.get, spec, conf, metricsConfig)
val append = targetTable.newAppend()
- files.foreach(file => append.appendFile(file.toDataFile(PartitionSpec.unpartitioned)))
+ files.foreach(append.appendFile)
append.commit()
}
@@ -516,6 +413,8 @@ object SparkTableUtil {
stagingDir: String): Unit = {
implicit val manifestFileEncoder: Encoder[ManifestFile] = Encoders.javaSerialization[ManifestFile]
+ implicit val dataFileEncoder: Encoder[DataFile] = Encoders.javaSerialization[DataFile]
+ implicit val pathDataFileEncoder: Encoder[(String, DataFile)] = Encoders.tuple(Encoders.STRING, dataFileEncoder)
import spark.implicits._
@@ -527,10 +426,11 @@ object SparkTableUtil {
val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
val manifests = partitionDS
- .flatMap(partition => listPartition(partition, serializableConf, metricsConfig))
+ .flatMap(partition => listPartition(partition, spec, serializableConf, metricsConfig))
.repartition(numShufflePartitions)
- .orderBy($"path")
- .mapPartitions(buildManifest(serializableConf, spec, stagingDir))
+ .map(file => (file.path.toString, file))
+ .orderBy($"_1")
+ .mapPartitions(files => buildManifest(serializableConf, spec, stagingDir)(files.map(_._2)))
.collect()
try {