You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/09/24 23:24:21 UTC

[incubator-iceberg] branch master updated: Spark: Allow limiting output data file size (#432)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c2435d6  Spark: Allow limiting output data file size (#432)
c2435d6 is described below

commit c2435d64709b9657768f35ae5048c9ef4a352716
Author: Xabriel J. Collazo Mojica <xc...@adobe.com>
AuthorDate: Tue Sep 24 16:24:16 2019 -0700

    Spark: Allow limiting output data file size (#432)
---
 .../java/org/apache/iceberg/TableProperties.java   |   3 +
 .../org/apache/iceberg/parquet/ParquetWriter.java  |   2 +-
 site/docs/configuration.md                         |   2 +
 .../org/apache/iceberg/spark/source/Writer.java    | 276 ++++++++++++---------
 .../iceberg/spark/source/TestParquetWrite.java     |  91 +++++++
 5 files changed, 261 insertions(+), 113 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 88057bf..7479fd6 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -100,4 +100,7 @@ public class TableProperties {
 
   public static final String WRITE_AUDIT_PUBLISH_ENABLED = "write.wap.enabled";
   public static final String WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT = "false";
+
+  public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes";
+  public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = Long.MAX_VALUE;
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 593de9b..14838d5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -126,7 +126,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   @Override
   public long length() {
     try {
-      return writer.getPos();
+      return writer.getPos() + writeStore.getBufferedSize();
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to get file length");
     }
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 3e939fe..419ef22 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -25,6 +25,7 @@ Iceberg tables support table properties to configure table behavior, like the de
 | write.metadata.compression-codec   | none               | Metadata compression codec; none or gzip           |
 | write.metadata.metrics.default     | truncate(16)       | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
 | write.metadata.metrics.column.col1 | (not set)          | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full |
+| write.target-file-size-bytes       | Long.MAX_VALUE     | Controls the size of files generated to target about this many bytes. |
 
 ### Table behavior properties
 
@@ -75,4 +76,5 @@ df.write
 | Spark option | Default                    | Description                                                  |
 | ------------ | -------------------------- | ------------------------------------------------------------ |
 | write-format | Table write.format.default | File format to use for this write operation; parquet or avro |
+| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes     |
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 889118b..b05fe3b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -19,12 +19,10 @@
 
 package org.apache.iceberg.spark.source;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -33,7 +31,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.function.Function;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
@@ -57,6 +54,7 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.data.SparkAvroWriter;
 import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -77,6 +75,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
 import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
 // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
 class Writer implements DataSourceWriter {
@@ -89,6 +89,7 @@ class Writer implements DataSourceWriter {
   private final boolean replacePartitions;
   private final String applicationId;
   private final String wapId;
+  private final long targetFileSize;
 
   Writer(Table table, DataSourceOptions options, boolean replacePartitions, String applicationId) {
     this(table, options, replacePartitions, applicationId, null);
@@ -102,6 +103,10 @@ class Writer implements DataSourceWriter {
     this.replacePartitions = replacePartitions;
     this.applicationId = applicationId;
     this.wapId = wapId;
+
+    long tableTargetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
   }
 
   private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
@@ -119,7 +124,7 @@ class Writer implements DataSourceWriter {
   @Override
   public DataWriterFactory<InternalRow> createWriterFactory() {
     return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager);
+        table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager, targetFileSize);
   }
 
   @Override
@@ -242,34 +247,31 @@ class Writer implements DataSourceWriter {
     private final FileFormat format;
     private final LocationProvider locations;
     private final Map<String, String> properties;
-    private final String uuid = UUID.randomUUID().toString();
     private final FileIO fileIo;
     private final EncryptionManager encryptionManager;
+    private final long targetFileSize;
 
     WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                  Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager) {
+                  Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
+                  long targetFileSize) {
       this.spec = spec;
       this.format = format;
       this.locations = locations;
       this.properties = properties;
       this.fileIo = fileIo;
       this.encryptionManager = encryptionManager;
+      this.targetFileSize = targetFileSize;
     }
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-      String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
-      AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
+      OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
+      AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
+
       if (spec.fields().isEmpty()) {
-        OutputFile outputFile = fileIo.newOutputFile(locations.newDataLocation(filename));
-        return new UnpartitionedWriter(encryptionManager.encrypt(outputFile), format, factory, fileIo);
+        return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
       } else {
-        Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey =
-            key -> {
-              OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, filename));
-              return encryptionManager.encrypt(rawOutputFile);
-            };
-        return new PartitionedWriter(spec, format, factory, newOutputFileForKey, fileIo);
+        return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
       }
     }
 
@@ -305,142 +307,119 @@ class Writer implements DataSourceWriter {
         }
       }
     }
-  }
-
-  private interface AppenderFactory<T> {
-    FileAppender<T> newAppender(OutputFile file, FileFormat format);
-  }
-
-  private static class UnpartitionedWriter implements DataWriter<InternalRow>, Closeable {
-    private final FileIO fileIo;
-    private FileAppender<InternalRow> appender = null;
-    private Metrics metrics = null;
-    private List<Long> offsetRanges = null;
-    private final EncryptedOutputFile file;
-
-    UnpartitionedWriter(
-        EncryptedOutputFile outputFile,
-        FileFormat format,
-        AppenderFactory<InternalRow> factory,
-        FileIO fileIo) {
-      this.fileIo = fileIo;
-      this.file = outputFile;
-      this.appender = factory.newAppender(file.encryptingOutputFile(), format);
-    }
 
-    @Override
-    public void write(InternalRow record) {
-      appender.add(record);
-    }
-
-    @Override
-    public WriterCommitMessage commit() throws IOException {
-      Preconditions.checkArgument(appender != null, "Commit called on a closed writer: %s", this);
-
-      // metrics and splitOffsets are populated on close
-      close();
-
-      if (metrics.recordCount() == 0L) {
-        fileIo.deleteFile(file.encryptingOutputFile());
-        return new TaskCommit();
+    private class OutputFileFactory {
+      private final int partitionId;
+      private final long taskId;
+      private final long epochId;
+      // The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
+      // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
+      // with a recursive listing and grep.
+      private final String uuid = UUID.randomUUID().toString();
+      private int fileCount;
+
+      OutputFileFactory(int partitionId, long taskId, long epochId) {
+        this.partitionId = partitionId;
+        this.taskId = taskId;
+        this.epochId = epochId;
+        this.fileCount = 0;
       }
 
-      DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null, metrics, offsetRanges);
-
-      return new TaskCommit(dataFile);
-    }
-
-    @Override
-    public void abort() throws IOException {
-      Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this);
+      private String generateFilename() {
+        return format.addExtension(String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount++));
+      }
 
-      close();
-      fileIo.deleteFile(file.encryptingOutputFile());
-    }
+      /**
+       * Generates EncryptedOutputFile for UnpartitionedWriter.
+       */
+      public EncryptedOutputFile newOutputFile() {
+        OutputFile file = fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+        return encryptionManager.encrypt(file);
+      }
 
-    @Override
-    public void close() throws IOException {
-      if (this.appender != null) {
-        this.appender.close();
-        this.metrics = appender.metrics();
-        this.offsetRanges = appender.splitOffsets();
-        this.appender = null;
+      /**
+       * Generates EncryptedOutputFile for PartitionedWriter.
+       */
+      public EncryptedOutputFile newOutputFile(PartitionKey key) {
+        OutputFile rawOutputFile = fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+        return encryptionManager.encrypt(rawOutputFile);
       }
     }
   }
 
-  private static class PartitionedWriter implements DataWriter<InternalRow> {
-    private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+  private interface AppenderFactory<T> {
+    FileAppender<T> newAppender(OutputFile file, FileFormat format);
+  }
+
+  private abstract static class BaseWriter implements DataWriter<InternalRow> {
+    protected static final int ROWS_DIVISOR = 1000;
+
     private final List<DataFile> completedFiles = Lists.newArrayList();
     private final PartitionSpec spec;
     private final FileFormat format;
-    private final AppenderFactory<InternalRow> factory;
-    private final Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey;
-    private final PartitionKey key;
+    private final AppenderFactory<InternalRow> appenderFactory;
+    private final WriterFactory.OutputFileFactory fileFactory;
     private final FileIO fileIo;
-
+    private final long targetFileSize;
     private PartitionKey currentKey = null;
     private FileAppender<InternalRow> currentAppender = null;
     private EncryptedOutputFile currentFile = null;
+    private long currentRows = 0;
 
-    PartitionedWriter(
-        PartitionSpec spec,
-        FileFormat format,
-        AppenderFactory<InternalRow> factory,
-        Function<PartitionKey, EncryptedOutputFile> newOutputFileForKey,
-        FileIO fileIo) {
+    BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
+               WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
       this.spec = spec;
       this.format = format;
-      this.factory = factory;
-      this.newOutputFileForKey = newOutputFileForKey;
-      this.key = new PartitionKey(spec);
+      this.appenderFactory = appenderFactory;
+      this.fileFactory = fileFactory;
       this.fileIo = fileIo;
+      this.targetFileSize = targetFileSize;
     }
 
     @Override
-    public void write(InternalRow row) throws IOException {
-      key.partition(row);
+    public abstract void write(InternalRow row) throws IOException;
 
-      if (!key.equals(currentKey)) {
+    public void writeInternal(InternalRow row)  throws IOException {
+      if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
         closeCurrent();
-
-        if (completedPartitions.contains(key)) {
-          // if rows are not correctly grouped, detect and fail the write
-          PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
-          LOG.warn("Duplicate key: {} == {}", existingKey, key);
-          throw new IllegalStateException("Already closed file for partition: " + key.toPath());
-        }
-
-        this.currentKey = key.copy();
-        this.currentFile = newOutputFileForKey.apply(currentKey);
-        this.currentAppender = factory.newAppender(currentFile.encryptingOutputFile(), format);
+        openCurrent();
       }
 
       currentAppender.add(row);
+      currentRows++;
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
       closeCurrent();
+
       return new TaskCommit(completedFiles);
     }
 
     @Override
     public void abort() throws IOException {
+      closeCurrent();
+
       // clean up files created by this writer
       Tasks.foreach(completedFiles)
           .throwFailureWhenFinished()
           .noRetry()
           .run(file -> fileIo.deleteFile(file.path().toString()));
+    }
 
-      if (currentAppender != null) {
-        currentAppender.close();
-        this.currentAppender = null;
-        fileIo.deleteFile(currentFile.encryptingOutputFile());
+    protected void openCurrent() {
+      if (spec.fields().size() == 0) {
+        // unpartitioned
+        currentFile = fileFactory.newOutputFile();
+      } else {
+        // partitioned
+        currentFile = fileFactory.newOutputFile(currentKey);
       }
+      currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
+      currentRows = 0;
     }
 
-    private void closeCurrent() throws IOException {
+    protected void closeCurrent() throws IOException {
       if (currentAppender != null) {
         currentAppender.close();
         // metrics are only valid after the appender is closed
@@ -448,16 +427,89 @@ class Writer implements DataSourceWriter {
         List<Long> splitOffsets = currentAppender.splitOffsets();
         this.currentAppender = null;
 
-        DataFile dataFile = DataFiles.builder(spec)
-            .withEncryptedOutputFile(currentFile)
-            .withPartition(currentKey)
-            .withMetrics(metrics)
-            .withSplitOffsets(splitOffsets)
-            .build();
+        if (metrics.recordCount() == 0L) {
+          fileIo.deleteFile(currentFile.encryptingOutputFile());
+        } else {
+          DataFile dataFile = DataFiles.builder(spec)
+              .withEncryptedOutputFile(currentFile)
+              .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned
+              .withMetrics(metrics)
+              .withSplitOffsets(splitOffsets)
+              .build();
+          completedFiles.add(dataFile);
+        }
+
+        this.currentFile = null;
+      }
+    }
+
+    protected PartitionKey getCurrentKey() {
+      return currentKey;
+    }
+
+    protected void setCurrentKey(PartitionKey currentKey) {
+      this.currentKey = currentKey;
+    }
+  }
+
+  private static class UnpartitionedWriter extends BaseWriter {
+    private static final int ROWS_DIVISOR = 1000;
+
+    UnpartitionedWriter(
+        PartitionSpec spec,
+        FileFormat format,
+        AppenderFactory<InternalRow> appenderFactory,
+        WriterFactory.OutputFileFactory fileFactory,
+        FileIO fileIo,
+        long targetFileSize) {
+      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
+
+      openCurrent();
+    }
+
+    @Override
+    public void write(InternalRow row) throws IOException {
+      writeInternal(row);
+    }
+  }
+
+  private static class PartitionedWriter extends BaseWriter {
+    private final PartitionKey key;
+    private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+
+    PartitionedWriter(
+        PartitionSpec spec,
+        FileFormat format,
+        AppenderFactory<InternalRow> appenderFactory,
+        WriterFactory.OutputFileFactory fileFactory,
+        FileIO fileIo,
+        long targetFileSize) {
+      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
+
+      this.key = new PartitionKey(spec);
+    }
+
+    @Override
+    public void write(InternalRow row) throws IOException {
+      key.partition(row);
 
+      PartitionKey currentKey = getCurrentKey();
+      if (!key.equals(currentKey)) {
+        closeCurrent();
         completedPartitions.add(currentKey);
-        completedFiles.add(dataFile);
+
+        if (completedPartitions.contains(key)) {
+          // if rows are not correctly grouped, detect and fail the write
+          PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
+          LOG.warn("Duplicate key: {} == {}", existingKey, key);
+          throw new IllegalStateException("Already closed files for partition: " + key.toPath());
+        }
+
+        setCurrentKey(key.copy());
+        openCurrent();
       }
+
+      writeInternal(row);
     }
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index d8d164b..79a50d2 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
@@ -247,4 +248,94 @@ public class TestParquetWrite {
     Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
     Assert.assertEquals("Result rows should match", expected, actual);
   }
+
+  @Test
+  public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    table.updateProperties()
+        .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
+        .commit();
+
+    List<SimpleRecord> expected = Lists.newArrayListWithCapacity(4000);
+    for (int i = 0; i < 4000; i++) {
+      expected.add(new SimpleRecord(i, "a"));
+    }
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(location.toString());
+
+    table.refresh();
+
+    Dataset<Row> result = spark.read()
+        .format("iceberg")
+        .load(location.toString());
+
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+    Assert.assertEquals("Result rows should match", expected, actual);
+
+    List<DataFile> files = Lists.newArrayList();
+    for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+      for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) {
+        files.add(file);
+      }
+    }
+    Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
+    Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
+  }
+
+  @Test
+  public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
+    for (int i = 0; i < 2000; i++) {
+      expected.add(new SimpleRecord(i, "a"));
+      expected.add(new SimpleRecord(i, "b"));
+      expected.add(new SimpleRecord(i, "c"));
+      expected.add(new SimpleRecord(i, "d"));
+    }
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").sort("data").write()
+        .format("iceberg")
+        .mode("append")
+        .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
+        .save(location.toString());
+
+    table.refresh();
+
+    Dataset<Row> result = spark.read()
+        .format("iceberg")
+        .load(location.toString());
+
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+    Assert.assertEquals("Result rows should match", expected, actual);
+
+    List<DataFile> files = Lists.newArrayList();
+    for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+      for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) {
+        files.add(file);
+      }
+    }
+    Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
+    Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
+  }
 }