You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/09/03 19:09:49 UTC

[iceberg] 08/09: Spark: Fix stats in rewrite metadata action (#5691)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 7feb9457bd095b1d5e3d6224db4780cc5b833a36
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Sep 1 23:20:57 2022 -0700

    Spark: Fix stats in rewrite metadata action (#5691)
    
    * Core: Don't show dropped fields from the partition spec
    
    * Use projection instead
    
    * Use StructProjection in SparkDataFile.
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
---
 .../extensions/TestRewriteManifestsProcedure.java  |  29 ++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    |  21 ++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
 .../extensions/TestRewriteManifestsProcedure.java  |  29 ++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    |  21 ++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
 6 files changed, 262 insertions(+), 48 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dcf0a2d91e..0d10cb0d7d 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.spark.extensions;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
@@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
         IllegalArgumentException.class, "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
   }
+
+  @Test
+  public void testReplacePartitionField() {
+    sql(
+        "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+        tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+    sql(
+        "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+        tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+    sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index a6390d39c5..5fe0cd86a4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile {
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
+  private final StructLike partitionProjection;
   private Row wrapped;
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
+    this(type, null, sparkType);
+  }
+
+  public SparkDataFile(
+      Types.StructType type, Types.StructType projectedType, StructType sparkType) {
     this.lowerBoundsType = type.fieldType("lower_bounds");
     this.upperBoundsType = type.fieldType("upper_bounds");
     this.keyMetadataType = type.fieldType("key_metadata");
-    this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+    Types.StructType partitionType = type.fieldType("partition").asStructType();
+    this.wrappedPartition = new SparkStructLike(partitionType);
+
+    if (projectedType != null) {
+      Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+      this.partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+    } else {
+      this.partitionProjection = wrappedPartition;
+    }
 
     Map<String, Integer> positions = Maps.newHashMap();
     type.fields().forEach(field -> {
@@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return wrappedPartition;
+    return partitionProjection;
   }
 
   @Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 99e51a37aa..030532fa94 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we rely only on the target number of manifests for unpartitioned tables
     // as we should not worry about having too much metadata per partition
@@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-            manifestEncoder
-        )
+            toManifests(
+                io,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                combinedPartitionType,
+                spec,
+                sparkType),
+            manifestEncoder)
         .collectAsList();
   }
 
@@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction
 
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
     long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
 
-    return withReusableDS(manifestEntryDF, df -> {
-      Column partitionColumn = df.col("data_file.partition");
-      return df.repartitionByRange(numManifests, partitionColumn)
-          .sortWithinPartitions(partitionColumn)
-          .mapPartitions(
-              toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-              manifestEncoder
-          )
-          .collectAsList();
-    });
+    return withReusableDS(
+        manifestEntryDF,
+        df -> {
+          Column partitionColumn = df.col("data_file.partition");
+          return df.repartitionByRange(numManifests, partitionColumn)
+              .sortWithinPartitions(partitionColumn)
+              .mapPartitions(
+                  toManifests(
+                      io,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      combinedPartitionType,
+                      spec,
+                      sparkType),
+                  manifestEncoder)
+              .collectAsList();
+        });
   }
 
   private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
@@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction
   }
 
   private static ManifestFile writeManifest(
-      List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      List<Row> rows,
+      int startIndex,
+      int endIndex,
+      Broadcast<FileIO> io,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType)
+      throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
 
@@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io, long maxNumManifestEntries, String location,
-      int format, PartitionSpec spec, StructType sparkType) {
+      Broadcast<FileIO> io,
+      long maxNumManifestEntries,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType) {
 
     return rows -> {
       List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction
 
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
-        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
-        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                midIndex,
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dcf0a2d91e..0d10cb0d7d 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.spark.extensions;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
@@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
         IllegalArgumentException.class, "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
   }
+
+  @Test
+  public void testReplacePartitionField() {
+    sql(
+        "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+        tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+    sql(
+        "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+        tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+    sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+  }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index a6390d39c5..5fe0cd86a4 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile {
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
+  private final StructLike partitionProjection;
   private Row wrapped;
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
+    this(type, null, sparkType);
+  }
+
+  public SparkDataFile(
+      Types.StructType type, Types.StructType projectedType, StructType sparkType) {
     this.lowerBoundsType = type.fieldType("lower_bounds");
     this.upperBoundsType = type.fieldType("upper_bounds");
     this.keyMetadataType = type.fieldType("key_metadata");
-    this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+    Types.StructType partitionType = type.fieldType("partition").asStructType();
+    this.wrappedPartition = new SparkStructLike(partitionType);
+
+    if (projectedType != null) {
+      Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+      this.partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+    } else {
+      this.partitionProjection = wrappedPartition;
+    }
 
     Map<String, Integer> positions = Maps.newHashMap();
     type.fields().forEach(field -> {
@@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return wrappedPartition;
+    return partitionProjection;
   }
 
   @Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 99e51a37aa..030532fa94 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we rely only on the target number of manifests for unpartitioned tables
     // as we should not worry about having too much metadata per partition
@@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-            manifestEncoder
-        )
+            toManifests(
+                io,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                combinedPartitionType,
+                spec,
+                sparkType),
+            manifestEncoder)
         .collectAsList();
   }
 
@@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction
 
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
     long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
 
-    return withReusableDS(manifestEntryDF, df -> {
-      Column partitionColumn = df.col("data_file.partition");
-      return df.repartitionByRange(numManifests, partitionColumn)
-          .sortWithinPartitions(partitionColumn)
-          .mapPartitions(
-              toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-              manifestEncoder
-          )
-          .collectAsList();
-    });
+    return withReusableDS(
+        manifestEntryDF,
+        df -> {
+          Column partitionColumn = df.col("data_file.partition");
+          return df.repartitionByRange(numManifests, partitionColumn)
+              .sortWithinPartitions(partitionColumn)
+              .mapPartitions(
+                  toManifests(
+                      io,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      combinedPartitionType,
+                      spec,
+                      sparkType),
+                  manifestEncoder)
+              .collectAsList();
+        });
   }
 
   private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
@@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction
   }
 
   private static ManifestFile writeManifest(
-      List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      List<Row> rows,
+      int startIndex,
+      int endIndex,
+      Broadcast<FileIO> io,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType)
+      throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
 
@@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io, long maxNumManifestEntries, String location,
-      int format, PartitionSpec spec, StructType sparkType) {
+      Broadcast<FileIO> io,
+      long maxNumManifestEntries,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType) {
 
     return rows -> {
       List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction
 
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
-        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
-        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                midIndex,
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();