You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/18 21:06:45 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request #2608: Repair manifests

szehon-ho opened a new pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608


   Fixes: https://github.com/apache/iceberg/issues/2435
   
   Took Russell's suggestion, actually its a lot easier to add an option to RewriteManifestsAction (which already is well-distributed).  Propose: RepairMode, if == REPAIR_ENTRIES, then goes to FileSystem to read metadata about the file to update the manifest entry before rewrite. 
   
   Next steps:
   
   -repair split offsets (did not implement , as just wanted to get a first version)
   -Mode: RepairMode.REMOVE_DELETED_FILES, RepairMode.ADD_NEW_DATA_FILES


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635652832



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -310,37 +341,54 @@ private void deleteFiles(Iterable<String> locations) {
         .run(fileIO::deleteFile);
   }
 
-  private static ManifestFile writeManifest(
+  private static RepairManifestHelper.RepairedManifestFile writeManifest(
       List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      String location, int format, PartitionSpec spec, StructType sparkType,
+      RepairMode mode, Broadcast<Table> broadcastTable, Broadcast<SerializableConfiguration> conf) throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
+    Set<String> repairedColumns = new HashSet<String>();

Review comment:
       Minor: we use factory methods, like `Sets.newHashSet()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655739073



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());

Review comment:
       Repairing manifests ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635653366



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,

Review comment:
       It is rare for an Iceberg method to use `get` because it almost never aids understanding of what the method does. Unless the method is a getter method on a java bean and the class needs to conform to that convention, we should find a more descriptive verb.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655671427



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");
+    }
+    if (first.recordCount() != second.recordCount()) {
+      result.add("record_count");
+    }
+    if (!Objects.equals(first.columnSizes(), second.columnSizes())) {
+      result.add("column_sizes");
+    }
+    if (!Objects.equals(first.valueCounts(), second.valueCounts())) {
+      result.add("value_counts");
+    }
+    if (!Objects.equals(first.nullValueCounts(), second.nullValueCounts())) {
+      result.add("null_value_counts");
+    }
+    if (!Objects.equals(first.nanValueCounts(), second.nanValueCounts())) {
+      result.add("nan_value_counts");
+    }
+    if (!Objects.equals(first.lowerBounds(), second.lowerBounds())) {
+      result.add("lower_bounds");
+    }
+    if (!Objects.equals(first.upperBounds(), second.upperBounds())) {
+      result.add("upper_bounds");
+    }
+    return result;
+  }
+
+  /**
+   * Given a data file pointer, return a repaired version if actual file information does not match.
+   * @param file spark data file
+   * @param spec user-specified spec
+   * @param table table information
+   * @param conf Hadoop configuration
+   * @return A repaired DataFile if repair was done (file information did not match), or None if not
+   */
+  static Optional<RepairedDataFile> repairDataFile(SparkDataFile file,
+                                                   Table table,
+                                                   PartitionSpec spec,
+                                                   Configuration conf) {
+    DataFiles.Builder newDfBuilder = DataFiles.builder(spec).copy(file);
+    Path path = new Path(file.path().toString());
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(path);
+      newDfBuilder.withStatus(status);
+      String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+      NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+      newDfBuilder.withMetrics(getMetrics(file.format(), status, conf,
+          MetricsConfig.fromProperties(table.properties()), nameMapping));
+
+      DataFile newFile = newDfBuilder.build();
+      Set<String> diff = RepairManifestHelper.diff(file, newFile);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-843612237


   FYI @aokolnychyi @RussellSpitzer @flyrain  
   
   (javadoc failure doesnt look related: Could not GET 'https://plugins.gradle.org/m2/org/jetbrains/kotlin/kotlin-stdlib-jdk8/1.3.50/kotlin-stdlib-jdk8-1.3.50.jar'.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635651943



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -86,26 +91,29 @@
   private static final String USE_CACHING = "use-caching";
   private static final boolean USE_CACHING_DEFAULT = true;
 
-  private final Encoder<ManifestFile> manifestEncoder;
+  private final Encoder<RepairManifestHelper.RepairedManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
   private final FileIO fileIO;
+  private final SerializableConfiguration hadoopConf;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
   private String stagingLocation = null;
+  private RepairMode mode = RepairMode.NONE;
 
   public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
-    this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
+    this.manifestEncoder = Encoders.javaSerialization(RepairManifestHelper.RepairedManifestFile.class);

Review comment:
       Are these changes actually needed? The `RepairedManifestFile` interface sends back the fields that were repaired. Is that specific to a file and empty if, for example, the length for all manifest entries were already correct?
   
   I don't see much benefit to doing it that way. If we were to have more specific repair operations, then we don't need that interface at all because we'd already know what manifests fields are being fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635649237



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteManifestsActionResult.java
##########
@@ -19,22 +19,27 @@
 
 package org.apache.iceberg.actions;
 
+import java.util.Set;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public class BaseRewriteManifestsActionResult implements RewriteManifests.Result {
 
   private final Iterable<ManifestFile> rewrittenManifests;
   private final Iterable<ManifestFile> addedManifests;
+  private final Iterable<?> repairedManifests;
 
   public BaseRewriteManifestsActionResult(Iterable<ManifestFile> rewrittenManifests,
-                                          Iterable<ManifestFile> addedManifests) {
+                                          Iterable<ManifestFile> addedManifests,
+                                          Iterable<BaseRepairedManifestFile> repairedManifests) {

Review comment:
       You probably didn't intend to use the implementation class in this API, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655742548



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RepairManifests.Result doExecute() {
+    // Find matching manifest entries
+    Map<String, ManifestFile> manifestFiles = findMatchingManifests();
+    Dataset<Row> manifestEntryDf = buildManifestEntryDF(new ArrayList<>(manifestFiles.values()));
+    StructType sparkType = (StructType) manifestEntryDf.schema().apply("data_file").dataType();
+    JavaRDD<Row> manifestEntryRdd = manifestEntryDf.toJavaRDD();
+    JavaPairRDD<ManifestFileInfo, Iterable<Row>> entriesByManifest = manifestEntryRdd.groupBy(
+        r -> new ManifestFileInfo(r.getString(0), r.getInt(1)));
+
+    // Calculate manifest entries for repair
+    RepairManifestHelper.RepairOptions options = new RepairManifestHelper.RepairOptions(repairMetrics);
+    Broadcast<Table> broadcastTable = sparkContext().broadcast(SerializableTable.copyOf(getTable()));
+    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
+    JavaRDD<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> toRepair =
+        entriesByManifest.map(calculateRepairs(broadcastTable, conf, sparkType, options))
+            // Process only manifest files with repaired entries
+        .filter(m -> StreamSupport.stream(m._2.spliterator(), false)
+            .anyMatch(p -> p.repaired));
+
+    // Write out repaired manifests
+    Broadcast<FileIO> io = sparkContext().broadcast(getFileIO());
+    JavaRDD<ManifestFile> repairedManifests = toRepair.map(
+        writeRepairs(io, broadcastTable, formatVersion, stagingLocation));
+
+    // Prepare results
+    List<ManifestFile> addedManifests = repairedManifests.collect();
+    List<ManifestFile> deletedManifests = toRepair.collect().stream().map(t -> {
+      String path = t._1().getPath();
+      ManifestFile mf = manifestFiles.get(path);
+      // Sanity check deleted file existed in original list
+      Preconditions.checkNotNull(mf, "Manifest file cannot be null for " + path);
+      return mf;
+    }).collect(Collectors.toList());
+
+    Iterable<ManifestFile> newManifests = replaceManifests(deletedManifests, addedManifests);
+    return new BaseRepairManifestsActionResult(deletedManifests, Lists.newArrayList(newManifests));
+  }
+
+
+  private Map<String, ManifestFile> findMatchingManifests() {
+    Snapshot currentSnapshot = getTable().currentSnapshot();
+    if (currentSnapshot == null) {
+      return ImmutableMap.of();
+    }
+    return currentSnapshot.dataManifests().stream()
+        .filter(manifest -> predicate.test(manifest))
+        .collect(Collectors.toMap(ManifestFile::path, mf -> mf));
+  }
+
+  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
+    Dataset<Row> manifestDF = spark()
+        .createDataset(Lists.transform(manifests, m -> new ManifestFileInfo(m.path(), m.partitionSpecId())),
+            Encoders.bean(ManifestFileInfo.class))
+        .toDF("partition_spec_id", "manifest_path");
+    Dataset<Row> manifestEntryDF = loadMetadataTable(getTable(), ENTRIES)
+        .filter("status < 2") // select only live entries
+        .selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");
+
+    return manifestEntryDF.as("manifest_entry")
+        .join(manifestDF.as("manifest"), "manifest_path")
+        .select("manifest_entry.manifest_path", "manifest.partition_spec_id",
+            "snapshot_id", "sequence_number", "data_file");
+  }
+
+  private static Function<Tuple2<ManifestFileInfo, Iterable<Row>>,
+      Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> calculateRepairs(
+      Broadcast<Table> broadcastTable, Broadcast<SerializableConfiguration> conf,
+      StructType sparkType, RepairManifestHelper.RepairOptions options) {
+    return manifestFile -> {
+      Iterator<Row> rowIterator = manifestFile._2().iterator();
+      ManifestFileInfo manifestInfo = manifestFile._1();
+      int specId = manifestInfo.partSpecId;
+
+      PartitionSpec spec = broadcastTable.value().specs().get(specId);
+      Types.StructType dataFileType = DataFile.getType(spec.partitionType());
+      SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());
+
+      Iterable<PatchedManifestEntry> entries =
+          Streams.stream(rowIterator).map(r -> {
+            long snapshotId = r.getLong(2);
+            long sequenceNumber = r.getLong(3);
+            Row file = r.getStruct(4);
+            SparkDataFile dataFile = wrapper.wrap(file);
+            Optional<DataFile> repairedDataFile = RepairManifestHelper.repairDataFile(
+                dataFile, broadcastTable.value(), spec, conf.value().value(), options);
+            return repairedDataFile.map(
+                value -> new PatchedManifestEntry(snapshotId, sequenceNumber, value, true))
+                .orElseGet(() -> new PatchedManifestEntry(snapshotId, sequenceNumber, dataFile, false));
+          }).collect(Collectors.toList());
+      return new Tuple2<>(manifestFile._1(), entries);
+    };
+  }
+
+  private static Function<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>, ManifestFile> writeRepairs(
+      Broadcast<FileIO> io, Broadcast<Table> broadcastTable,
+      int formatVersion, String location) {
+    return rows -> {
+      Iterator<PatchedManifestEntry> entryIterator = rows._2().iterator();
+      ManifestFileInfo manifestInfo = rows._1();
+      int specId = manifestInfo.partSpecId;
+      PartitionSpec spec = broadcastTable.value().specs().get(specId);
+
+      String manifestName = "repaired-m-" + UUID.randomUUID();
+      Path newManifestPath = new Path(location, manifestName);
+      OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(newManifestPath.toString()));

Review comment:
       would it make sense to parallelize this as well? Using Tasks.foreach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-865317271


   As per the discussion that it deserves to be its own action rather than part of RewriteManifests, completely rewrote RepairManifests to be a separate spark action (BaseRepairManifestsSparkAction), and removed the base logic between it and BaseRewriteManifestAction to base class: BaseManifestSparkAction.
   
   Overall, it distributes the repair, first grouping all entries by ManifestFile, calculating what needs to be repaired for each entry by reading various aspects of the dataFile pointed to by the entry, and writing all the entries back out if any needed repair (the manifest file still retains same number of entries).
   
   Not all logic can be shared.  In Repair path, the specId is queried from the original manifest-file , and kept around to write the repaired manifest file.
   
   There is also a problem I noticed, the returned ManifestFiles of RewriteManifests action is wrong if "snapshotIdInheritanceEnabled" is false (as this path rewrites the manifest-file to a final location).  So fixed the method while extracting it from BaseRewriteManifestsSparkAction to the new base.  (A subsequent change can fix this issue and add a test in RewriteManifests).
   
   @rdblue @aokolnychyi @flyrain @RussellSpitzer  if you guys have time  for another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-845015249


   Great to see you back @rdblue  :)
   
   Yes, after this first implementation, I see some advantages of having dedicated RepairManifestAction.  RewriteManifestAction is compaction-oriented, and in so by design it cannot run across two separate partitionSpecs, whereas RepairManifests should be able to do so as it would not combine manifest files.  
   
   And yes in general, I see the two can be conceptually different like you said.  I can spend some time to look at making this separate action, and refactor common code to the base class.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-844347011


   @flyrain thanks for the review, so to understand, you would prefer a result of Map of individual manifest-entry changes instead of a summary of manifest-files changed?  I was thinking that but was fearing it would be too big of a result.
   
   But if we want this way (and extend this to a diff tool), we could change to return a List of something like:
   
   RepairedManifestEntry(ManifestFile parentFile, DataFile entry, List<String> repairedFields)
   
   Is that in line with your thoughts?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655740837



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RepairManifests.Result doExecute() {
+    // Find matching manifest entries
+    Map<String, ManifestFile> manifestFiles = findMatchingManifests();
+    Dataset<Row> manifestEntryDf = buildManifestEntryDF(new ArrayList<>(manifestFiles.values()));
+    StructType sparkType = (StructType) manifestEntryDf.schema().apply("data_file").dataType();
+    JavaRDD<Row> manifestEntryRdd = manifestEntryDf.toJavaRDD();
+    JavaPairRDD<ManifestFileInfo, Iterable<Row>> entriesByManifest = manifestEntryRdd.groupBy(
+        r -> new ManifestFileInfo(r.getString(0), r.getInt(1)));
+
+    // Calculate manifest entries for repair
+    RepairManifestHelper.RepairOptions options = new RepairManifestHelper.RepairOptions(repairMetrics);
+    Broadcast<Table> broadcastTable = sparkContext().broadcast(SerializableTable.copyOf(getTable()));
+    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
+    JavaRDD<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> toRepair =
+        entriesByManifest.map(calculateRepairs(broadcastTable, conf, sparkType, options))
+            // Process only manifest files with repaired entries
+        .filter(m -> StreamSupport.stream(m._2.spliterator(), false)
+            .anyMatch(p -> p.repaired));
+
+    // Write out repaired manifests
+    Broadcast<FileIO> io = sparkContext().broadcast(getFileIO());
+    JavaRDD<ManifestFile> repairedManifests = toRepair.map(
+        writeRepairs(io, broadcastTable, formatVersion, stagingLocation));
+
+    // Prepare results
+    List<ManifestFile> addedManifests = repairedManifests.collect();
+    List<ManifestFile> deletedManifests = toRepair.collect().stream().map(t -> {
+      String path = t._1().getPath();
+      ManifestFile mf = manifestFiles.get(path);
+      // Sanity check deleted file existed in original list
+      Preconditions.checkNotNull(mf, "Manifest file cannot be null for " + path);
+      return mf;
+    }).collect(Collectors.toList());
+
+    Iterable<ManifestFile> newManifests = replaceManifests(deletedManifests, addedManifests);
+    return new BaseRepairManifestsActionResult(deletedManifests, Lists.newArrayList(newManifests));
+  }
+
+
+  private Map<String, ManifestFile> findMatchingManifests() {
+    Snapshot currentSnapshot = getTable().currentSnapshot();
+    if (currentSnapshot == null) {
+      return ImmutableMap.of();
+    }
+    return currentSnapshot.dataManifests().stream()
+        .filter(manifest -> predicate.test(manifest))
+        .collect(Collectors.toMap(ManifestFile::path, mf -> mf));
+  }
+
+  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
+    Dataset<Row> manifestDF = spark()
+        .createDataset(Lists.transform(manifests, m -> new ManifestFileInfo(m.path(), m.partitionSpecId())),
+            Encoders.bean(ManifestFileInfo.class))
+        .toDF("partition_spec_id", "manifest_path");
+    Dataset<Row> manifestEntryDF = loadMetadataTable(getTable(), ENTRIES)
+        .filter("status < 2") // select only live entries
+        .selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");
+
+    return manifestEntryDF.as("manifest_entry")

Review comment:
       I still think we should fix this in the actual ManfiestEntry Row and have the spec generated at read time, it feels a little odd to me to be joining to reconnect it when we knew the right ID when we read it the first time. I'm fine with this for now but we really need to get SpecID into that metadata table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-865317271


   As per the discussion that it deserves to be its own action rather than part of RewriteManifests, completely rewrote RepairManifests to be a separate spark action (BaseRepairManifestsSparkAction), and removed the base logic between it and BaseRewriteManifestAction to base class: BaseManifestSparkAction.
   
   Overall, it distributes the repair, first grouping all entries by ManifestFile, calculating what needs to be repaired for each entry by reading various aspects of the dataFile pointed to by the entry, and writing all the entries back out if any needed repair (the manifest file still retains same number of entries).
   
   Not all logic can be shared.  In Repair path, the specId is queried from the original manifest-file , and kept around to write the repaired manifest file.
   
   There is also a problem I noticed, the returned ManifestFiles of RewriteManifests action is wrong if "snapshotIdInheritanceEnabled" is false (as this path rewrites the manifest-file to a final location).  So fixed the method while extracting it from BaseRewriteManifestsSparkAction to the new base.  (A subsequent change can fix this issue and add a test in RewriteManifests).
   
   @rdblue @aokolnychyi @flyrain @RussellSpitzer  if you guys have time  for another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-865317271


   As per the discussion that it deserves to be its own action rather than part of RewriteManifests, completely rewrote RepairManifests to be a separate spark action (BaseRepairManifestsSparkAction), and removed the base logic between it and BaseRewriteManifestAction to base class: BaseManifestSparkAction.
   
   Summary, it distributes the repair, first grouping all entries by ManifestFile, calculating what needs to be repaired for each entry by reading various aspects of the dataFile pointed to by the entry, and writing all the entries back out if any needed repair (the manifest file still retains same number of entries).
   
   Not all logic can be shared.  In Repair path, the specId is queried from the original manifest-file , and kept around to write the repaired manifest file (vs passed in).
   
   There is also a problem I noticed, the returned ManifestFiles of RewriteManifests action is wrong if "snapshotIdInheritanceEnabled" is false (as this path rewrites the manifest-file to a final location).  So fixed the method while extracting it from BaseRewriteManifestsSparkAction to the new base.  (A subsequent change can fix this issue and add a test in RewriteManifests).
   
   @rdblue @aokolnychyi @flyrain @RussellSpitzer  if you guys have time  for another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655671427



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");
+    }
+    if (first.recordCount() != second.recordCount()) {
+      result.add("record_count");
+    }
+    if (!Objects.equals(first.columnSizes(), second.columnSizes())) {
+      result.add("column_sizes");
+    }
+    if (!Objects.equals(first.valueCounts(), second.valueCounts())) {
+      result.add("value_counts");
+    }
+    if (!Objects.equals(first.nullValueCounts(), second.nullValueCounts())) {
+      result.add("null_value_counts");
+    }
+    if (!Objects.equals(first.nanValueCounts(), second.nanValueCounts())) {
+      result.add("nan_value_counts");
+    }
+    if (!Objects.equals(first.lowerBounds(), second.lowerBounds())) {
+      result.add("lower_bounds");
+    }
+    if (!Objects.equals(first.upperBounds(), second.upperBounds())) {
+      result.add("upper_bounds");
+    }
+    return result;
+  }
+
+  /**
+   * Given a data file pointer, return a repaired version if actual file information does not match.
+   * @param file spark data file
+   * @param spec user-specified spec
+   * @param table table information
+   * @param conf Hadoop configuration
+   * @return A repaired DataFile if repair was done (file information did not match), or None if not
+   */
+  static Optional<RepairedDataFile> repairDataFile(SparkDataFile file,
+                                                   Table table,
+                                                   PartitionSpec spec,
+                                                   Configuration conf) {
+    DataFiles.Builder newDfBuilder = DataFiles.builder(spec).copy(file);
+    Path path = new Path(file.path().toString());
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(path);
+      newDfBuilder.withStatus(status);
+      String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+      NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+      newDfBuilder.withMetrics(getMetrics(file.format(), status, conf,
+          MetricsConfig.fromProperties(table.properties()), nameMapping));
+
+      DataFile newFile = newDfBuilder.build();
+      Set<String> diff = RepairManifestHelper.diff(file, newFile);

Review comment:
       Done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");

Review comment:
       Done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics

Review comment:
       Removed the comment

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,

Review comment:
       Renamed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655739073



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());

Review comment:
       Repairing manifests ?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);

Review comment:
       REPAIR-MANIFESTS

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RepairManifests.Result doExecute() {
+    // Find matching manifest entries
+    Map<String, ManifestFile> manifestFiles = findMatchingManifests();
+    Dataset<Row> manifestEntryDf = buildManifestEntryDF(new ArrayList<>(manifestFiles.values()));
+    StructType sparkType = (StructType) manifestEntryDf.schema().apply("data_file").dataType();
+    JavaRDD<Row> manifestEntryRdd = manifestEntryDf.toJavaRDD();
+    JavaPairRDD<ManifestFileInfo, Iterable<Row>> entriesByManifest = manifestEntryRdd.groupBy(
+        r -> new ManifestFileInfo(r.getString(0), r.getInt(1)));
+
+    // Calculate manifest entries for repair
+    RepairManifestHelper.RepairOptions options = new RepairManifestHelper.RepairOptions(repairMetrics);
+    Broadcast<Table> broadcastTable = sparkContext().broadcast(SerializableTable.copyOf(getTable()));
+    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
+    JavaRDD<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> toRepair =
+        entriesByManifest.map(calculateRepairs(broadcastTable, conf, sparkType, options))
+            // Process only manifest files with repaired entries
+        .filter(m -> StreamSupport.stream(m._2.spliterator(), false)
+            .anyMatch(p -> p.repaired));
+
+    // Write out repaired manifests
+    Broadcast<FileIO> io = sparkContext().broadcast(getFileIO());
+    JavaRDD<ManifestFile> repairedManifests = toRepair.map(
+        writeRepairs(io, broadcastTable, formatVersion, stagingLocation));
+
+    // Prepare results
+    List<ManifestFile> addedManifests = repairedManifests.collect();
+    List<ManifestFile> deletedManifests = toRepair.collect().stream().map(t -> {
+      String path = t._1().getPath();
+      ManifestFile mf = manifestFiles.get(path);
+      // Sanity check deleted file existed in original list
+      Preconditions.checkNotNull(mf, "Manifest file cannot be null for " + path);
+      return mf;
+    }).collect(Collectors.toList());
+
+    Iterable<ManifestFile> newManifests = replaceManifests(deletedManifests, addedManifests);
+    return new BaseRepairManifestsActionResult(deletedManifests, Lists.newArrayList(newManifests));
+  }
+
+
+  private Map<String, ManifestFile> findMatchingManifests() {
+    Snapshot currentSnapshot = getTable().currentSnapshot();
+    if (currentSnapshot == null) {
+      return ImmutableMap.of();
+    }
+    return currentSnapshot.dataManifests().stream()
+        .filter(manifest -> predicate.test(manifest))
+        .collect(Collectors.toMap(ManifestFile::path, mf -> mf));
+  }
+
+  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
+    Dataset<Row> manifestDF = spark()
+        .createDataset(Lists.transform(manifests, m -> new ManifestFileInfo(m.path(), m.partitionSpecId())),
+            Encoders.bean(ManifestFileInfo.class))
+        .toDF("partition_spec_id", "manifest_path");
+    Dataset<Row> manifestEntryDF = loadMetadataTable(getTable(), ENTRIES)
+        .filter("status < 2") // select only live entries
+        .selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");
+
+    return manifestEntryDF.as("manifest_entry")

Review comment:
       I still think we should fix this in the actual ManfiestEntry Row and have the spec generated at read time, it feels a little odd to me to be joining to reconnect it when we knew the right ID when we read it the first time. I'm fine with this for now but we really need to get SpecID into that metadata table.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RepairManifests.Result doExecute() {
+    // Find matching manifest entries
+    Map<String, ManifestFile> manifestFiles = findMatchingManifests();
+    Dataset<Row> manifestEntryDf = buildManifestEntryDF(new ArrayList<>(manifestFiles.values()));
+    StructType sparkType = (StructType) manifestEntryDf.schema().apply("data_file").dataType();
+    JavaRDD<Row> manifestEntryRdd = manifestEntryDf.toJavaRDD();
+    JavaPairRDD<ManifestFileInfo, Iterable<Row>> entriesByManifest = manifestEntryRdd.groupBy(
+        r -> new ManifestFileInfo(r.getString(0), r.getInt(1)));
+
+    // Calculate manifest entries for repair
+    RepairManifestHelper.RepairOptions options = new RepairManifestHelper.RepairOptions(repairMetrics);
+    Broadcast<Table> broadcastTable = sparkContext().broadcast(SerializableTable.copyOf(getTable()));
+    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
+    JavaRDD<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> toRepair =
+        entriesByManifest.map(calculateRepairs(broadcastTable, conf, sparkType, options))
+            // Process only manifest files with repaired entries
+        .filter(m -> StreamSupport.stream(m._2.spliterator(), false)
+            .anyMatch(p -> p.repaired));
+
+    // Write out repaired manifests
+    Broadcast<FileIO> io = sparkContext().broadcast(getFileIO());
+    JavaRDD<ManifestFile> repairedManifests = toRepair.map(
+        writeRepairs(io, broadcastTable, formatVersion, stagingLocation));
+
+    // Prepare results
+    List<ManifestFile> addedManifests = repairedManifests.collect();
+    List<ManifestFile> deletedManifests = toRepair.collect().stream().map(t -> {
+      String path = t._1().getPath();
+      ManifestFile mf = manifestFiles.get(path);
+      // Sanity check deleted file existed in original list
+      Preconditions.checkNotNull(mf, "Manifest file cannot be null for " + path);
+      return mf;
+    }).collect(Collectors.toList());
+
+    Iterable<ManifestFile> newManifests = replaceManifests(deletedManifests, addedManifests);
+    return new BaseRepairManifestsActionResult(deletedManifests, Lists.newArrayList(newManifests));
+  }
+
+
+  private Map<String, ManifestFile> findMatchingManifests() {
+    Snapshot currentSnapshot = getTable().currentSnapshot();
+    if (currentSnapshot == null) {
+      return ImmutableMap.of();
+    }
+    return currentSnapshot.dataManifests().stream()
+        .filter(manifest -> predicate.test(manifest))
+        .collect(Collectors.toMap(ManifestFile::path, mf -> mf));
+  }
+
+  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
+    Dataset<Row> manifestDF = spark()
+        .createDataset(Lists.transform(manifests, m -> new ManifestFileInfo(m.path(), m.partitionSpecId())),
+            Encoders.bean(ManifestFileInfo.class))
+        .toDF("partition_spec_id", "manifest_path");
+    Dataset<Row> manifestEntryDF = loadMetadataTable(getTable(), ENTRIES)
+        .filter("status < 2") // select only live entries
+        .selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");
+
+    return manifestEntryDF.as("manifest_entry")
+        .join(manifestDF.as("manifest"), "manifest_path")
+        .select("manifest_entry.manifest_path", "manifest.partition_spec_id",
+            "snapshot_id", "sequence_number", "data_file");
+  }
+
+  private static Function<Tuple2<ManifestFileInfo, Iterable<Row>>,
+      Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> calculateRepairs(
+      Broadcast<Table> broadcastTable, Broadcast<SerializableConfiguration> conf,
+      StructType sparkType, RepairManifestHelper.RepairOptions options) {
+    return manifestFile -> {
+      Iterator<Row> rowIterator = manifestFile._2().iterator();
+      ManifestFileInfo manifestInfo = manifestFile._1();
+      int specId = manifestInfo.partSpecId;
+
+      PartitionSpec spec = broadcastTable.value().specs().get(specId);
+      Types.StructType dataFileType = DataFile.getType(spec.partitionType());
+      SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());
+
+      Iterable<PatchedManifestEntry> entries =
+          Streams.stream(rowIterator).map(r -> {
+            long snapshotId = r.getLong(2);
+            long sequenceNumber = r.getLong(3);
+            Row file = r.getStruct(4);
+            SparkDataFile dataFile = wrapper.wrap(file);
+            Optional<DataFile> repairedDataFile = RepairManifestHelper.repairDataFile(
+                dataFile, broadcastTable.value(), spec, conf.value().value(), options);
+            return repairedDataFile.map(
+                value -> new PatchedManifestEntry(snapshotId, sequenceNumber, value, true))
+                .orElseGet(() -> new PatchedManifestEntry(snapshotId, sequenceNumber, dataFile, false));
+          }).collect(Collectors.toList());
+      return new Tuple2<>(manifestFile._1(), entries);
+    };
+  }
+
+  private static Function<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>, ManifestFile> writeRepairs(
+      Broadcast<FileIO> io, Broadcast<Table> broadcastTable,
+      int formatVersion, String location) {
+    return rows -> {
+      Iterator<PatchedManifestEntry> entryIterator = rows._2().iterator();
+      ManifestFileInfo manifestInfo = rows._1();
+      int specId = manifestInfo.partSpecId;
+      PartitionSpec spec = broadcastTable.value().specs().get(specId);
+
+      String manifestName = "repaired-m-" + UUID.randomUUID();
+      Path newManifestPath = new Path(location, manifestName);
+      OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(newManifestPath.toString()));

Review comment:
       would it make sense to parallelize this as well? Using Tasks.foreach?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles

Review comment:
       Although for one liners I believe we do
   Return .....
   
   Without the tag, this is so the summary field gets populated with the full description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r656731546



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RepairManifests.Result doExecute() {
+    // Find matching manifest entries
+    Map<String, ManifestFile> manifestFiles = findMatchingManifests();
+    Dataset<Row> manifestEntryDf = buildManifestEntryDF(new ArrayList<>(manifestFiles.values()));
+    StructType sparkType = (StructType) manifestEntryDf.schema().apply("data_file").dataType();
+    JavaRDD<Row> manifestEntryRdd = manifestEntryDf.toJavaRDD();
+    JavaPairRDD<ManifestFileInfo, Iterable<Row>> entriesByManifest = manifestEntryRdd.groupBy(
+        r -> new ManifestFileInfo(r.getString(0), r.getInt(1)));
+
+    // Calculate manifest entries for repair
+    RepairManifestHelper.RepairOptions options = new RepairManifestHelper.RepairOptions(repairMetrics);
+    Broadcast<Table> broadcastTable = sparkContext().broadcast(SerializableTable.copyOf(getTable()));
+    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
+    JavaRDD<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> toRepair =
+        entriesByManifest.map(calculateRepairs(broadcastTable, conf, sparkType, options))
+            // Process only manifest files with repaired entries
+        .filter(m -> StreamSupport.stream(m._2.spliterator(), false)
+            .anyMatch(p -> p.repaired));
+
+    // Write out repaired manifests
+    Broadcast<FileIO> io = sparkContext().broadcast(getFileIO());
+    JavaRDD<ManifestFile> repairedManifests = toRepair.map(
+        writeRepairs(io, broadcastTable, formatVersion, stagingLocation));
+
+    // Prepare results
+    List<ManifestFile> addedManifests = repairedManifests.collect();
+    List<ManifestFile> deletedManifests = toRepair.collect().stream().map(t -> {
+      String path = t._1().getPath();
+      ManifestFile mf = manifestFiles.get(path);
+      // Sanity check deleted file existed in original list
+      Preconditions.checkNotNull(mf, "Manifest file cannot be null for " + path);
+      return mf;
+    }).collect(Collectors.toList());
+
+    Iterable<ManifestFile> newManifests = replaceManifests(deletedManifests, addedManifests);
+    return new BaseRepairManifestsActionResult(deletedManifests, Lists.newArrayList(newManifests));
+  }
+
+
+  private Map<String, ManifestFile> findMatchingManifests() {
+    Snapshot currentSnapshot = getTable().currentSnapshot();
+    if (currentSnapshot == null) {
+      return ImmutableMap.of();
+    }
+    return currentSnapshot.dataManifests().stream()
+        .filter(manifest -> predicate.test(manifest))
+        .collect(Collectors.toMap(ManifestFile::path, mf -> mf));
+  }
+
+  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
+    Dataset<Row> manifestDF = spark()
+        .createDataset(Lists.transform(manifests, m -> new ManifestFileInfo(m.path(), m.partitionSpecId())),
+            Encoders.bean(ManifestFileInfo.class))
+        .toDF("partition_spec_id", "manifest_path");
+    Dataset<Row> manifestEntryDF = loadMetadataTable(getTable(), ENTRIES)
+        .filter("status < 2") // select only live entries
+        .selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");
+
+    return manifestEntryDF.as("manifest_entry")
+        .join(manifestDF.as("manifest"), "manifest_path")
+        .select("manifest_entry.manifest_path", "manifest.partition_spec_id",
+            "snapshot_id", "sequence_number", "data_file");
+  }
+
+  private static Function<Tuple2<ManifestFileInfo, Iterable<Row>>,
+      Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> calculateRepairs(
+      Broadcast<Table> broadcastTable, Broadcast<SerializableConfiguration> conf,
+      StructType sparkType, RepairManifestHelper.RepairOptions options) {
+    return manifestFile -> {
+      Iterator<Row> rowIterator = manifestFile._2().iterator();
+      ManifestFileInfo manifestInfo = manifestFile._1();
+      int specId = manifestInfo.partSpecId;
+
+      PartitionSpec spec = broadcastTable.value().specs().get(specId);
+      Types.StructType dataFileType = DataFile.getType(spec.partitionType());
+      SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());
+
+      Iterable<PatchedManifestEntry> entries =
+          Streams.stream(rowIterator).map(r -> {
+            long snapshotId = r.getLong(2);
+            long sequenceNumber = r.getLong(3);
+            Row file = r.getStruct(4);
+            SparkDataFile dataFile = wrapper.wrap(file);
+            Optional<DataFile> repairedDataFile = RepairManifestHelper.repairDataFile(
+                dataFile, broadcastTable.value(), spec, conf.value().value(), options);
+            return repairedDataFile.map(
+                value -> new PatchedManifestEntry(snapshotId, sequenceNumber, value, true))
+                .orElseGet(() -> new PatchedManifestEntry(snapshotId, sequenceNumber, dataFile, false));
+          }).collect(Collectors.toList());
+      return new Tuple2<>(manifestFile._1(), entries);
+    };
+  }
+
+  private static Function<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>, ManifestFile> writeRepairs(
+      Broadcast<FileIO> io, Broadcast<Table> broadcastTable,
+      int formatVersion, String location) {
+    return rows -> {
+      Iterator<PatchedManifestEntry> entryIterator = rows._2().iterator();
+      ManifestFileInfo manifestInfo = rows._1();
+      int specId = manifestInfo.partSpecId;
+      PartitionSpec spec = broadcastTable.value().specs().get(specId);
+
+      String manifestName = "repaired-m-" + UUID.randomUUID();
+      Path newManifestPath = new Path(location, manifestName);
+      OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(newManifestPath.toString()));

Review comment:
       Do you mean , parallelize this call: writer.existing(e.dataFile, e.snapshotId, e.sequenceNumber)?
   
   It does not seem very multi-thread safe, looking through the writers in the writer stack (ManifestWriter, AvroFileAppender, DataFileWriter, GenericAvroWriter, the Avro writers..) have a lot of variables like counts that aren't safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-845015249


   Great to see you back on reviews @rdblue  :)
   
   Yes, after this first implementation, I see some advantages of having dedicated RepairManifestAction.  RewriteManifestAction is compaction-oriented, and in so by design it cannot run across two separate partitionSpecs, whereas RepairManifests should be able to do so as it would not combine manifest files.  
   
   And yes in general, I see the two can be conceptually different like you said.  I can spend some time to look at making this separate action, and refactor common code to the base class.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635474280



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -168,16 +182,26 @@ public RewriteManifests stagingLocation(String newStagingLocation) {
 
     Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
 
-    List<ManifestFile> newManifests;
+    List<RepairManifestHelper.RepairedManifestFile> repairedManifests;
     if (spec.fields().size() < 1) {
-      newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
+      repairedManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
     } else {
-      newManifests = writeManifestsForPartitionedTable(manifestEntryDF, targetNumManifests, targetNumManifestEntries);
+      repairedManifests = writeManifestsForPartitionedTable(
+              manifestEntryDF, targetNumManifests, targetNumManifestEntries);
     }
 
+    List<ManifestFile> newManifests = repairedManifests.stream().map(

Review comment:
       nit:
   ```
     .stream()
     .map(RepairManifestHelper.RepairedManifestFile::manifestFile)
     .collect
     ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655671577



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");

Review comment:
       Done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics

Review comment:
       Removed the comment

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,

Review comment:
       Renamed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635471858



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
##########
@@ -91,14 +92,19 @@ public SparkDataFile wrap(Row row) {
     return this;
   }
 
+  public SparkDataFile withSpecId(int newSpecId) {

Review comment:
       Did you file the proposal to add specID into the datafile? I know i've had a few temporary prs where I thought about doing this. We could change the reader itself so that this is always populated and I think there is an active PR to do this somewhere as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
flyrain commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635535503



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");
+    }
+    if (first.recordCount() != second.recordCount()) {
+      result.add("record_count");
+    }
+    if (!Objects.equals(first.columnSizes(), second.columnSizes())) {
+      result.add("column_sizes");
+    }
+    if (!Objects.equals(first.valueCounts(), second.valueCounts())) {
+      result.add("value_counts");
+    }
+    if (!Objects.equals(first.nullValueCounts(), second.nullValueCounts())) {
+      result.add("null_value_counts");
+    }
+    if (!Objects.equals(first.nanValueCounts(), second.nanValueCounts())) {
+      result.add("nan_value_counts");
+    }
+    if (!Objects.equals(first.lowerBounds(), second.lowerBounds())) {
+      result.add("lower_bounds");
+    }
+    if (!Objects.equals(first.upperBounds(), second.upperBounds())) {
+      result.add("upper_bounds");
+    }
+    return result;
+  }
+
+  /**
+   * Given a data file pointer, return a repaired version if actual file information does not match.
+   * @param file spark data file
+   * @param spec user-specified spec
+   * @param table table information
+   * @param conf Hadoop configuration
+   * @return A repaired DataFile if repair was done (file information did not match), or None if not
+   */
+  static Optional<RepairedDataFile> repairDataFile(SparkDataFile file,
+                                                   Table table,
+                                                   PartitionSpec spec,
+                                                   Configuration conf) {
+    DataFiles.Builder newDfBuilder = DataFiles.builder(spec).copy(file);
+    Path path = new Path(file.path().toString());
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(path);
+      newDfBuilder.withStatus(status);
+      String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+      NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+      newDfBuilder.withMetrics(getMetrics(file.format(), status, conf,
+          MetricsConfig.fromProperties(table.properties()), nameMapping));
+
+      DataFile newFile = newDfBuilder.build();
+      Set<String> diff = RepairManifestHelper.diff(file, newFile);
+      if (diff.isEmpty()) {
+        return Optional.empty();
+      } else {
+        return Optional.of(new RepairedDataFile(newFile, diff));
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**
+   * Represents a repaired DataFile
+   */
+  public static class RepairedDataFile {

Review comment:
       Maybe put this comment into class's comment as well, "Needs to be public for Spark serialization"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635650393



##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
##########
@@ -56,6 +57,26 @@
    */
   RewriteManifests stagingLocation(String stagingLocation);
 
+
+  /**
+   * Allows reading of data files to repair them.
+   * @param mode repair mode
+   * @return this for method chaining
+   */
+  RewriteManifests repair(RepairMode mode);

Review comment:
       I generally prefer to use configuration methods rather than enums in public APIs. That would also allow us to avoid the NONE option and have the actual repair be a bit more specific, like `repairFileLengths()`. What do you guys think, @szehon-ho, @RussellSpitzer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-844572271


   @szehon-ho, @RussellSpitzer, I'm debating whether I agree with the choice to add repair operations to the rewrite manifests action. I think it's very different and significantly more expensive to touch each data file. I think it would make sense for repairs to be done by a repair action. We can share a lot of the implementation, but I think it makes more sense to have an action dedicated to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
flyrain commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635534474



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles

Review comment:
       @RussellSpitzer, checkstyle didn't report any issue here but param comments are missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r636016830



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -86,26 +91,29 @@
   private static final String USE_CACHING = "use-caching";
   private static final boolean USE_CACHING_DEFAULT = true;
 
-  private final Encoder<ManifestFile> manifestEncoder;
+  private final Encoder<RepairManifestHelper.RepairedManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
   private final FileIO fileIO;
+  private final SerializableConfiguration hadoopConf;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
   private String stagingLocation = null;
+  private RepairMode mode = RepairMode.NONE;
 
   public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
-    this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
+    this.manifestEncoder = Encoders.javaSerialization(RepairManifestHelper.RepairedManifestFile.class);

Review comment:
       OK yes I was trying to think something useful to return to user.  But maybe its not very useful as its not specific as per discussion with @flyrain  earlier in the review, and returning all patched manifest-entries is a bit overkill.  I'm ok for just return the list of repaired ManifestFile 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
flyrain commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r634826758



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -86,26 +91,29 @@
   private static final String USE_CACHING = "use-caching";
   private static final boolean USE_CACHING_DEFAULT = true;
 
-  private final Encoder<ManifestFile> manifestEncoder;
+  private final Encoder<RepairManifestHelper.RepairedManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
   private final FileIO fileIO;
+  private final SerializableConfiguration hadoopConf;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
   private String stagingLocation = null;
+  private RepairMode mode = RepairMode.NONE;

Review comment:
       "mode" -> "repairMode", make it more descriptive?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");
+    }
+    if (first.recordCount() != second.recordCount()) {
+      result.add("record_count");
+    }
+    if (!Objects.equals(first.columnSizes(), second.columnSizes())) {
+      result.add("column_sizes");
+    }
+    if (!Objects.equals(first.valueCounts(), second.valueCounts())) {
+      result.add("value_counts");
+    }
+    if (!Objects.equals(first.nullValueCounts(), second.nullValueCounts())) {
+      result.add("null_value_counts");
+    }
+    if (!Objects.equals(first.nanValueCounts(), second.nanValueCounts())) {
+      result.add("nan_value_counts");
+    }
+    if (!Objects.equals(first.lowerBounds(), second.lowerBounds())) {
+      result.add("lower_bounds");
+    }
+    if (!Objects.equals(first.upperBounds(), second.upperBounds())) {
+      result.add("upper_bounds");
+    }
+    return result;
+  }
+
+  /**
+   * Given a data file pointer, return a repaired version if actual file information does not match.
+   * @param file spark data file
+   * @param spec user-specified spec
+   * @param table table information
+   * @param conf Hadoop configuration
+   * @return A repaired DataFile if repair was done (file information did not match), or None if not
+   */
+  static Optional<RepairedDataFile> repairDataFile(SparkDataFile file,
+                                                   Table table,
+                                                   PartitionSpec spec,
+                                                   Configuration conf) {
+    DataFiles.Builder newDfBuilder = DataFiles.builder(spec).copy(file);
+    Path path = new Path(file.path().toString());
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(path);
+      newDfBuilder.withStatus(status);
+      String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+      NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+      newDfBuilder.withMetrics(getMetrics(file.format(), status, conf,
+          MetricsConfig.fromProperties(table.properties()), nameMapping));
+
+      DataFile newFile = newDfBuilder.build();
+      Set<String> diff = RepairManifestHelper.diff(file, newFile);

Review comment:
       A nit: RepairManifestHelper.diff -> diff

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -86,26 +91,29 @@
   private static final String USE_CACHING = "use-caching";
   private static final boolean USE_CACHING_DEFAULT = true;
 
-  private final Encoder<ManifestFile> manifestEncoder;
+  private final Encoder<RepairManifestHelper.RepairedManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
   private final FileIO fileIO;
+  private final SerializableConfiguration hadoopConf;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
   private String stagingLocation = null;
+  private RepairMode mode = RepairMode.NONE;

Review comment:
       Do we need a list of mode here? IIUC, we are trying to add more modes, and users can select one or many modes.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics

Review comment:
       The param comments order isn't correct. Do we even need them? The names have told the story.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -310,37 +341,54 @@ private void deleteFiles(Iterable<String> locations) {
         .run(fileIO::deleteFile);
   }
 
-  private static ManifestFile writeManifest(
+  private static RepairManifestHelper.RepairedManifestFile writeManifest(
       List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      String location, int format, PartitionSpec spec, StructType sparkType,
+      RepairMode mode, Broadcast<Table> broadcastTable, Broadcast<SerializableConfiguration> conf) throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
+    Set<String> repairedColumns = new HashSet<String>();
 
     try {
       for (int index = startIndex; index < endIndex; index++) {
         Row row = rows.get(index);
         long snapshotId = row.getLong(0);
         long sequenceNumber = row.getLong(1);
         Row file = row.getStruct(2);
-        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
+        SparkDataFile dataFile = wrapper.wrap(file);
+        if (mode == RepairMode.REPAIR_ENTRIES) {
+          Optional<RepairManifestHelper.RepairedDataFile> repaired =
+                  RepairManifestHelper.repairDataFile(dataFile, broadcastTable.value(), spec, conf.value().value());
+          if (repaired.isPresent()) {
+            repairedColumns.addAll(repaired.get().repairedFields());

Review comment:
       Is it readable if we put fields of multiple files in one set? Are we going to distinguish diffs for each data files?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");

Review comment:
       "file_size_in_bytes" -> FILE_SIZE.name() ? 
   This applies to the following strings as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635458680



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics

Review comment:
       Pretty sure our checkstyle will be mad if they are missing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655748598



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles

Review comment:
       Although for one liners I believe we do
   Return .....
   
   Without the tag, this is so the summary field gets populated with the full description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r656729884



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RepairManifests.Result doExecute() {
+    // Find matching manifest entries
+    Map<String, ManifestFile> manifestFiles = findMatchingManifests();
+    Dataset<Row> manifestEntryDf = buildManifestEntryDF(new ArrayList<>(manifestFiles.values()));
+    StructType sparkType = (StructType) manifestEntryDf.schema().apply("data_file").dataType();
+    JavaRDD<Row> manifestEntryRdd = manifestEntryDf.toJavaRDD();
+    JavaPairRDD<ManifestFileInfo, Iterable<Row>> entriesByManifest = manifestEntryRdd.groupBy(
+        r -> new ManifestFileInfo(r.getString(0), r.getInt(1)));
+
+    // Calculate manifest entries for repair
+    RepairManifestHelper.RepairOptions options = new RepairManifestHelper.RepairOptions(repairMetrics);
+    Broadcast<Table> broadcastTable = sparkContext().broadcast(SerializableTable.copyOf(getTable()));
+    Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
+    JavaRDD<Tuple2<ManifestFileInfo, Iterable<PatchedManifestEntry>>> toRepair =
+        entriesByManifest.map(calculateRepairs(broadcastTable, conf, sparkType, options))
+            // Process only manifest files with repaired entries
+        .filter(m -> StreamSupport.stream(m._2.spliterator(), false)
+            .anyMatch(p -> p.repaired));
+
+    // Write out repaired manifests
+    Broadcast<FileIO> io = sparkContext().broadcast(getFileIO());
+    JavaRDD<ManifestFile> repairedManifests = toRepair.map(
+        writeRepairs(io, broadcastTable, formatVersion, stagingLocation));
+
+    // Prepare results
+    List<ManifestFile> addedManifests = repairedManifests.collect();
+    List<ManifestFile> deletedManifests = toRepair.collect().stream().map(t -> {
+      String path = t._1().getPath();
+      ManifestFile mf = manifestFiles.get(path);
+      // Sanity check deleted file existed in original list
+      Preconditions.checkNotNull(mf, "Manifest file cannot be null for " + path);
+      return mf;
+    }).collect(Collectors.toList());
+
+    Iterable<ManifestFile> newManifests = replaceManifests(deletedManifests, addedManifests);
+    return new BaseRepairManifestsActionResult(deletedManifests, Lists.newArrayList(newManifests));
+  }
+
+
+  private Map<String, ManifestFile> findMatchingManifests() {
+    Snapshot currentSnapshot = getTable().currentSnapshot();
+    if (currentSnapshot == null) {
+      return ImmutableMap.of();
+    }
+    return currentSnapshot.dataManifests().stream()
+        .filter(manifest -> predicate.test(manifest))
+        .collect(Collectors.toMap(ManifestFile::path, mf -> mf));
+  }
+
+  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
+    Dataset<Row> manifestDF = spark()
+        .createDataset(Lists.transform(manifests, m -> new ManifestFileInfo(m.path(), m.partitionSpecId())),
+            Encoders.bean(ManifestFileInfo.class))
+        .toDF("partition_spec_id", "manifest_path");
+    Dataset<Row> manifestEntryDF = loadMetadataTable(getTable(), ENTRIES)
+        .filter("status < 2") // select only live entries
+        .selectExpr("input_file_name() as manifest_path", "snapshot_id", "sequence_number", "data_file");
+
+    return manifestEntryDF.as("manifest_entry")

Review comment:
       I see, I will file a ticket for that if none exist.  What is the policy for adding to metadata table schemas, ie do you know if we have some backward-compatibility policy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
flyrain commented on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-844426726


   > @flyrain thanks for the review, so to understand, you would prefer a result of Map of individual manifest-entry changes instead of a summary of manifest-files changed? I was thinking that but was fearing it would be too big of a result.
   
   Yes. Map works here. Your concern is valid. The size varies dramatically. For a table with 1TB data, if the average file size is 256M, we got 1000000/256 = 4,000 data files, we probably needs 100 bytes for each data files, which is about 400M data, that sounds too much to me as well.
   
   In that sense, I'm OK with the current implementation, we can think about the different way to handle the future requirement.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r656729615



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());

Review comment:
       Done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635469963



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
##########
@@ -78,6 +78,16 @@ public RewriteManifestsAction useCaching(boolean newUseCaching) {
     return this;
   }
 
+  /**
+   * Allows reading of data files to repair them.
+   * @param mode repair mode
+   * @return this for method chaining
+   */
+  public RewriteManifestsAction repair(org.apache.iceberg.actions.RewriteManifests.RepairMode mode) {

Review comment:
       I don't think you need to update this version since it's deprecated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2608: Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r634748657



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/RepairManifestHelper.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkExceptionUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Helper methods to repair manifest files.
+ * TODO- repair split offsets
+ */
+public class RepairManifestHelper {
+
+  private RepairManifestHelper() {
+    // Prevent Construction
+  }
+
+    /**
+     * Given a DataFile information, return Metrics
+     * @param format file format
+     * @param status file status
+     * @param conf Hadoop configuration
+     * @param metricsSpec metrics configuration
+     * @param mapping name mapping
+     * @return metrics
+     */
+  private static Metrics getMetrics(FileFormat format, FileStatus status, Configuration conf,
+                                    MetricsConfig metricsSpec, NameMapping mapping) {
+    switch (format) {
+      case AVRO:
+        return new Metrics(-1L, null, null, null);
+      case ORC:
+        return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(status.getPath(), conf),
+          metricsSpec, mapping);
+      case PARQUET:
+        try {
+          ParquetMetadata metadata = ParquetFileReader.readFooter(conf, status);
+          return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+        } catch (IOException e) {
+          throw SparkExceptionUtil.toUncheckedException(
+            e, "Unable to read the footer of the parquet file: %s", status.getPath());
+        }
+      default:
+        throw new UnsupportedOperationException("Unknown file format: " + format);
+    }
+  }
+
+  /**
+   * Diffs two DataFile for potential for repair
+   * @return a set of fields in human-readable format that differ between these DataFiles
+   */
+  static Set<String> diff(DataFile first, DataFile second) {
+    Set<String> result = new HashSet<>();
+    if (first.fileSizeInBytes() != second.fileSizeInBytes()) {
+      result.add("file_size_in_bytes");
+    }
+    if (first.recordCount() != second.recordCount()) {
+      result.add("record_count");
+    }
+    if (!Objects.equals(first.columnSizes(), second.columnSizes())) {
+      result.add("column_sizes");
+    }
+    if (!Objects.equals(first.valueCounts(), second.valueCounts())) {
+      result.add("value_counts");
+    }
+    if (!Objects.equals(first.nullValueCounts(), second.nullValueCounts())) {
+      result.add("null_value_counts");
+    }
+    if (!Objects.equals(first.nanValueCounts(), second.nanValueCounts())) {
+      result.add("nan_value_counts");
+    }
+    if (!Objects.equals(first.lowerBounds(), second.lowerBounds())) {
+      result.add("lower_bounds");
+    }
+    if (!Objects.equals(first.upperBounds(), second.upperBounds())) {
+      result.add("upper_bounds");
+    }
+    return result;
+  }
+
+  /**
+   * Given a data file pointer, return a repaired version if actual file information does not match.
+   * @param file spark data file
+   * @param spec user-specified spec
+   * @param table table information
+   * @param conf Hadoop configuration
+   * @return A repaired DataFile if repair was done (file information did not match), or None if not
+   */
+  static Optional<RepairedDataFile> repairDataFile(SparkDataFile file,
+                                                   Table table,
+                                                   PartitionSpec spec,
+                                                   Configuration conf) {
+    DataFiles.Builder newDfBuilder = DataFiles.builder(spec).copy(file);
+    Path path = new Path(file.path().toString());
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus status = fs.getFileStatus(path);
+      newDfBuilder.withStatus(status);
+      String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+      NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
+      newDfBuilder.withMetrics(getMetrics(file.format(), status, conf,
+          MetricsConfig.fromProperties(table.properties()), nameMapping));
+
+      DataFile newFile = newDfBuilder.build();
+      Set<String> diff = RepairManifestHelper.diff(file, newFile);
+      if (diff.isEmpty()) {
+        return Optional.empty();
+      } else {
+        return Optional.of(new RepairedDataFile(newFile, diff));
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**
+   * Represents a repaired DataFile
+   */
+  public static class RepairedDataFile {

Review comment:
       Needs to be public for Spark serialization




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635470045



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsActionResult.java
##########
@@ -27,20 +27,26 @@
 public class RewriteManifestsActionResult {

Review comment:
       Also deprecated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r655739194



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRepairManifestsSparkAction.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.actions.BaseRepairManifestsActionResult;
+import org.apache.iceberg.actions.RepairManifests;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Tuple2;
+
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+
+/**
+ * An action that repairs manifests in a distributed manner, reading metadata of the data files to repair
+ * manifest entries in each manifest.
+ * <p>
+ * By default, this action repairs all manifests by checking the data file status and reading the data file headers.
+ * This can be changed by passing in repair options like {@link #repairMetrics(boolean)}, and manifest predicates
+ * via {@link #repairIf(Predicate)}.
+ * <p>
+ * In addition, there is a way to configure a custom location for new manifests via {@link #stagingLocation}.
+ */
+public class BaseRepairManifestsSparkAction extends BaseManifestSparkAction<RepairManifests, RepairManifests.Result>
+    implements RepairManifests {
+
+  private final int formatVersion;
+  private final SerializableConfiguration hadoopConf;
+
+  private Predicate<ManifestFile> predicate = manifest -> true;
+  private String stagingLocation;
+  private boolean repairMetrics = true;
+
+  public BaseRepairManifestsSparkAction(SparkSession spark, Table table) {
+    super(spark, table);
+
+    this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
+
+    // default the staging location to the metadata location
+    TableOperations ops = ((HasTableOperations) getTable()).operations();
+    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
+    this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for repaired manifests
+    this.formatVersion = ops.current().formatVersion();
+  }
+
+  @Override
+  public RepairManifests repairIf(Predicate<ManifestFile> newPredicate) {
+    this.predicate = newPredicate;
+    return this;
+  }
+
+  @Override
+  public RepairManifests stagingLocation(String newStagingLocation) {
+    this.stagingLocation = newStagingLocation;
+    return this;
+  }
+
+  @Override
+  public RepairManifests repairMetrics(boolean repair) {
+    this.repairMetrics = repair;
+    return this;
+  }
+
+  @Override
+  protected RepairManifests self() {
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, getTable().name());
+    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);

Review comment:
       REPAIR-MANIFESTS




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635649333



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteManifestsActionResult.java
##########
@@ -46,4 +51,31 @@ public BaseRewriteManifestsActionResult(Iterable<ManifestFile> rewrittenManifest
   public Iterable<ManifestFile> addedManifests() {
     return addedManifests;
   }
+
+  @Override
+  public Iterable<RepairedManifest> repairedManifests() {
+    return (Iterable<RepairedManifest>) repairedManifests;
+  }
+
+  public static class BaseRepairedManifestFile implements RewriteManifests.Result.RepairedManifest {

Review comment:
       The implementation class shouldn't be public.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#discussion_r635652898



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
##########
@@ -310,37 +341,54 @@ private void deleteFiles(Iterable<String> locations) {
         .run(fileIO::deleteFile);
   }
 
-  private static ManifestFile writeManifest(
+  private static RepairManifestHelper.RepairedManifestFile writeManifest(
       List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      String location, int format, PartitionSpec spec, StructType sparkType,
+      RepairMode mode, Broadcast<Table> broadcastTable, Broadcast<SerializableConfiguration> conf) throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType).withSpecId(spec.specId());
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
+    Set<String> repairedColumns = new HashSet<String>();
 
     try {
       for (int index = startIndex; index < endIndex; index++) {
         Row row = rows.get(index);
         long snapshotId = row.getLong(0);
         long sequenceNumber = row.getLong(1);
         Row file = row.getStruct(2);
-        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
+        SparkDataFile dataFile = wrapper.wrap(file);
+        if (mode == RepairMode.REPAIR_ENTRIES) {
+          Optional<RepairManifestHelper.RepairedDataFile> repaired =
+                  RepairManifestHelper.repairDataFile(dataFile, broadcastTable.value(), spec, conf.value().value());

Review comment:
       Nit: indentation is off.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2608: Core : Repair manifests

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2608:
URL: https://github.com/apache/iceberg/pull/2608#issuecomment-865317271


   As per the discussion that it deserves to be its own action rather than part of RewriteManifests, completely rewrote RepairManifests to be a separate spark action (BaseRepairManifestsSparkAction), and removed the base logic between it and BaseRewriteManifestAction to base class: BaseManifestSparkAction.
   
   Summary, it distributes the repair, first grouping all entries by ManifestFile, calculating what needs to be repaired for each entry by reading various aspects of the dataFile pointed to by the entry, and writing all the entries back out if any needed repair (the manifest file still retains same number of entries).
   
   Not all logic can be shared.  In Repair path, the specId is queried from the original manifest-file , and kept around to write the repaired manifest file (vs passed in).
   
   There is also a problem I noticed, the returned ManifestFiles of RewriteManifests action is wrong if "snapshotIdInheritanceEnabled" is false (as this path rewrites the manifest-file to a final location).  So fixed the method while extracting it from BaseRewriteManifestsSparkAction to the new base.  (A subsequent change can fix this issue and add a test in RewriteManifests).
   
   @rdblue @aokolnychyi @flyrain @RussellSpitzer  if you guys have time  for another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org