You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/02 04:22:28 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request #2779: Spark : Add duplicate file check in add_files

szehon-ho opened a new pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779


   This is a fix for problem discussed in  : https://github.com/apache/iceberg/issues/2735.
   
   The change looks big, but can be summarised:
   
   * Add check_duplicate_files flag in "Add Files Procedure", true by default (can be switched off by a flag)
   * Add checkDuplicateFiles flag in SparkTableUtil.importSparkPartitions
   * Implement it by loading the metadata table "files" and joining with the files that will be imported (in a Spark job)
   * Refactoring metadata table methods  BaseSparkActions => SparkTableUtil so both can load the metadata table.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r669320309



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       Yea that was my first version, I hit #2783 as I mentioned above (spent a good half day on it and couldn't find any easy fix)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-874478384


   Looked a bit into pruning issue found by original implementation, reported here: https://github.com/apache/iceberg/issues/2783
   
   Eliminate projection in latest changes to pass the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2779: Spark : Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-873350843


   Tests are failing due to new pruning issues on metadata tables, trying to track it down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r671591451



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       Yea and before it was doing 'duplicates.select("file_path").head(10)' and still failed, I think the pruning issue got worse in 3.1 (wrote some test in #2783)  (Or was there another select you had in mind?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r665738618



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");

Review comment:
       This formatting is a little odd I think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r671541294



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       Whoops missed that part of the comment.  You are right, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-895653892


   @RussellSpitzer  changed to select after your fix, if you want to take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r669319929



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(

Review comment:
       Done

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r664251421



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -355,9 +366,11 @@ public boolean isDefinedAt(Expression attr) {
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
    * @param partitionFilter only import partitions whose values match those in the map, can be partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
-                                      String stagingDir, Map<String, String> partitionFilter) {
+                                      String stagingDir, Map<String, String> partitionFilter,
+                                      boolean checkDuplicateFiles) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r664251421



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -355,9 +366,11 @@ public boolean isDefinedAt(Expression attr) {
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
    * @param partitionFilter only import partitions whose values match those in the map, can be partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
-                                      String stagingDir, Map<String, String> partitionFilter) {
+                                      String stagingDir, Map<String, String> partitionFilter,
+                                      boolean checkDuplicateFiles) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer merged pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer merged pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r665738303



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(

Review comment:
       Actually you use existing files below which is a good name, so maybe importedFiles here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-874478384


   Failure found by original implementation is reported : https://github.com/apache/iceberg/issues/2783
   
   Eliminate projection in latest changes to pass the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r671392024



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       I still think it would be easier to do .takeAsList(10) instead of the array conversion but nbd there




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-874478384


   Looked a bit into pruning issue found by original implementation, reported here: https://github.com/apache/iceberg/issues/2783
   
   Eliminate projection in latest changes to get this to work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r684556234



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       Finally changed this, after you merged fix for #2783




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r669320457



##########
File path: api/src/main/java/org/apache/iceberg/exceptions/DuplicateFileException.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * This exception indicates that duplicate data files will be added to a table.
+ */
+public class DuplicateFileException extends RuntimeException {

Review comment:
       Replaced with Preconditions.checkState




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r671376896



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       oh yeah .. :( the fix I think would be to select a column to prevent the pruning issue ... which would be hacky. I guess we should really fix that pruning issue :(




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r665741619



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -516,6 +590,48 @@ private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
         .run(item -> io.deleteFile(item.path()));
   }
 
+  // Attempt to use Spark3 Catalog resolution if available on the path

Review comment:
       This is all just moved verbatim out of BaseSparkAction right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-874478384






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-874478384


   Failure found by original implementation is reported : https://github.com/apache/iceberg/issues/2783
   
   Eliminate projection in latest changes to pass the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r674951900



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -95,11 +102,16 @@
  */
 public class SparkTableUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SparkTableUtil.class);
+
   private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
 
   private static final PathFilter HIDDEN_PATH_FILTER =
       p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
 
+  private static final String duplicateFileMessage = "Duplicate data files will be added to this table: %s.  " +
+      "Enable ignore.duplicates flag to avoid this error";

Review comment:
       This is no longer the parameter right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2779: Spark : Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r663265183



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -355,9 +366,11 @@ public boolean isDefinedAt(Expression attr) {
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
    * @param partitionFilter only import partitions whose values match those in the map, can be partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
-                                      String stagingDir, Map<String, String> partitionFilter) {
+                                      String stagingDir, Map<String, String> partitionFilter,
+                                      boolean checkDuplicateFiles) {

Review comment:
       Instead of changing all of the places where this code is called, would it make more sense to add an additional constructor that calls into this one and nets `checkDuplicateFiles` to a default value so as to not have to change the other code?
   
   Would a default value here be reasonable or no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r674948179



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -431,6 +462,18 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> importedFiles = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING()).toDF("file_path");

Review comment:
       I think this covers most cases, but probably will still have an issue for the odd Authority issues. Like if I have the table where my metadata entries have one authority or no authority, and my new files list doesn't. This is probably still as close as we can get without actually resolving everything I think ....




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r669320385



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -516,6 +590,48 @@ private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
         .run(item -> io.deleteFile(item.path()));
   }
 
+  // Attempt to use Spark3 Catalog resolution if available on the path

Review comment:
       Yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark : Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r663321805



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -355,9 +366,11 @@ public boolean isDefinedAt(Expression attr) {
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
    * @param partitionFilter only import partitions whose values match those in the map, can be partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
-                                      String stagingDir, Map<String, String> partitionFilter) {
+                                      String stagingDir, Map<String, String> partitionFilter,
+                                      boolean checkDuplicateFiles) {

Review comment:
       Yes you are right, I can do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r674941963



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -95,11 +102,16 @@
  */
 public class SparkTableUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SparkTableUtil.class);
+
   private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
 
   private static final PathFilter HIDDEN_PATH_FILTER =
       p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
 
+  private static final String duplicateFileMessage = "Duplicate data files will be added to this table: %s.  " +

Review comment:
       Think this should be reworded a bit,
   "Cannot complete import because data files to be imported already exist within the target table. Iceberg is not designed to have multiple references to the same file within the same table so this type of import is disabled by default. If you are sure this is what you would like to do set '$doAVariableReferenceHere' to true to force the import"
   
   Just to make sure folks know that by doubly importing things they are not necessarily doing something that will work or will be safe in the long run. For example duplicate file entries will ... have odd effects on MergeInto :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-881053754


   @RussellSpitzer when you have time for another look?  Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r665736535



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(

Review comment:
       fileD's -> importedPaths ? Just looking for a slightly more descriptive name here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r670866917



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -355,9 +366,11 @@ public boolean isDefinedAt(Expression attr) {
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
    * @param partitionFilter only import partitions whose values match those in the map, can be partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
-                                      String stagingDir, Map<String, String> partitionFilter) {
+                                      String stagingDir, Map<String, String> partitionFilter,
+                                      boolean checkDuplicateFiles) {

Review comment:
       Thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r675246494



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -95,11 +102,16 @@
  */
 public class SparkTableUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SparkTableUtil.class);
+
   private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
 
   private static final PathFilter HIDDEN_PATH_FILTER =
       p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
 
+  private static final String duplicateFileMessage = "Duplicate data files will be added to this table: %s.  " +

Review comment:
       It's a bit long, shortened it just a little but kept the main idea

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -95,11 +102,16 @@
  */
 public class SparkTableUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SparkTableUtil.class);
+
   private static final Joiner.MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("=");
 
   private static final PathFilter HIDDEN_PATH_FILTER =
       p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
 
+  private static final String duplicateFileMessage = "Duplicate data files will be added to this table: %s.  " +
+      "Enable ignore.duplicates flag to avoid this error";

Review comment:
       Fixed parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r665740514



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       could do takeAsList to make this a bit simpler also if we push the selection into the join it will be a cheaper join
   
   So two things here
   
   duplicates = importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.String)
   list<String> duplicateList = duplicates.takeAsList(10)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r665742233



##########
File path: api/src/main/java/org/apache/iceberg/exceptions/DuplicateFileException.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.exceptions;
+
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * This exception indicates that duplicate data files will be added to a table.
+ */
+public class DuplicateFileException extends RuntimeException {

Review comment:
       I don't have strong feelings about this but I don't think we need a new exception just for this, I think a Precondition.checkArguement or checkState would be fine with the appropriate message which you could keep as a constant string template in SparkTableUtil.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#discussion_r671591451



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
##########
@@ -425,6 +455,21 @@ private static void importUnpartitionedSparkTable(SparkSession spark, TableIdent
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> fileDs = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
+            .toDF("file_path");
+
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES);
+        Column joinCond = existingFiles.col("data_file.file_path").equalTo(fileDs.col("file_path"));
+        Dataset<Row> duplicates = fileDs.join(existingFiles, joinCond);
+        List<String> duplicateList = Arrays.stream((Row[]) duplicates.head(10))

Review comment:
       Yea and before it was doing 'duplicates.select("file_path").head(10)' and still failed, I think the pruning issue got worse in Spark 3.1 (wrote some test in #2783)  (Or was there another select you had in mind?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2779: Spark: Add duplicate file check in add_files

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2779:
URL: https://github.com/apache/iceberg/pull/2779#issuecomment-875946616


   @RussellSpitzer  could you take a look if you have some time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org