You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/11 23:50:35 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

RussellSpitzer opened a new pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584


   …ibuted
   
   Previously there was an exception when both the source and target of a MERGE INTO
   command shared the same distribution, causing an join without an exchange but
   partitioning was changed by DynamicFileFilterExec. This could only happen
   if both the target and source happend to have a single partition
   and were repesented with the distribution SinglePartition. If the filter removes
   the partition from the target then there is an unbalance between the number of
   partitions but the exchange is also missing. This situation breaks the join.
   
    We do not currently expose any other distribution information so in most cases
   the source and target will return unknown distribution which avoids this problem.
   In those situations an exchange always follows the DynamicFileFilterExec which makes
   sure the number of partitions always matches for the join.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-839609519


   Thank you very much,I have another question, can you help me answer it:When I'm merge into operation on the same table   ,appear error。can i  Obtain the state of the table to determine?thanks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-840227645


   > > Thank you very much,I have another question, can you help me answer it:When I'm merge into operation on the same table ,appear error。can i Obtain the state of the table to determine?thanks
   > 
   > I'm not sure what are you asking? Are you having a conflict error where it says the merge operation is violating the constraint of another opertaion?
   
   Thank you for your reply ,Yes Yes, that's the kind of mistake,Is there a better solution to this?Or I can get the merge operation state
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-839385622


   cc @dilipbiswal @rdblue @prodeezy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r630701730



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       This won't break the 0 task planning because we really only need to fix the case when there is no "exchange" being proposed. If the original scan had 0 tasks then there would be an exchange planned so the amount of output partitions we generate isn't important.
   
   I think we could also fix it in SparkMergeScan but i'm not sure that is a clearer fix than the change here.  What's the goal of moving the fix into SparkMergeScan? Just to make sure we don't change empty 0 partition rdds to empty 1 partition rdds?
   
   I think we are in the clear there since an RDD is defined as empty if either the partitions list is of size 0 or if the rdd.take(1) is empty so we are basically just changing one empty representation for another that works in this use-case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r631244605



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       I am convinced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-842005197


   > > > > @Reo-LEI @221770490011111 Could you please take a look?
   > > > 
   > > > 
   > > > This patch can pass my test case and run correctly on product env. I think this problem is resolved. Thanks for your fix! @RussellSpitzer
   > > 
   > > 
   > > Awesome!! Are you using this branch RussellSpitzer:FixDynamicFilterEmptyMerge?@Reo-LEI
   > 
   > Nope, I maintain an internal branch, and merge this PR into my branch.
   
   thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r630682483



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       We could also remember the original output partitioning in dynamic filtering and then do that check. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-840613206


   > > > > > Thank you very much,I have another question, can you help me answer it:When I'm merge into operation on the same table ,appear error。can i Obtain the state of the table to determine?thanks
   > > > > 
   > > > > 
   > > > > I'm not sure what are you asking? Are you having a conflict error where it says the merge operation is violating the constraint of another opertaion?
   > > > 
   > > > 
   > > > Thank you for your reply ,Yes Yes, that's the kind of mistake,Is there a better solution to this?Or I can get the merge operation state
   > > 
   > > 
   > > this is Specific error:
   > > 2021-05-13 16:16:50java.sql.SQLException: Error running query: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [alluxio://alluxio-master-0.default.svc.cluster.local:19998/cdp.db/dim_cust_user_mid/data/00000-4074
   > 
   > It would probably best for you to ask about this on the mailing list, but the issue here is that another operation committed while the Merge was being created. This invalidated the merge that had been created causing it to fail.
   > 
   > A trivial example
   > 
   > User A changes Row (1, 2) to (2, 2)
   > User B changes Row (1, 2) to (4, 2)
   > 
   > If both users attempt to do this at the same time one must fail, since both operations cannot both have happened. The validation step in iceberg automatically causes whoever finishes second to fail.
   
   Thank you very much for your reply
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r631259907



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       We could add a test with 0 files planned to be safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-840238396


   > @Reo-LEI @221770490011111 Could you please take a look?
   
   Awesome!! I will do some tests on this patch today. Thank you very much for your follow-up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r630659943



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only

Review comment:
       `partition` -> `the output partitioning of the node`.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       Will this break a case when we originally planned 0 tasks? Then the correct distribution is UnspecifiedDistribution(0) but we will report an RDD with one partition?
   
   I thought about adding a check like `outputPartitioning == SinglePartition` but that is not going to work as the partitioning will be current (i.e. whatever the scan node reports currently, not originally).
   
   Question. Can we approach this problem from a different angle? Say we make our `SparkMergeScan` report a single empty task after dynamic filtering if we originally had only 1 partition.
   
   I think it should be sufficient to modify `filterFiles` in the following way:
   
   ```
     @Override
     public void filterFiles(Set<String> locations) {
       singlePartitionScan = tasks().size() == 1;
       ...
     }
   ```
   
   And then `tasks`:
   
   ```
       if (singlePartitionScan && tasks.isEmpty()) {
         tasks = Lists.newArrayList(new BaseCombinedScanTask(Lists.newArrayList()));
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-841936884


   > > @Reo-LEI @221770490011111 Could you please take a look?
   > 
   > This patch can pass my test case and run correctly on product env. I think this problem is resolved. Thanks for your fix! @RussellSpitzer
   
   Awesome!! Are you using this branch RussellSpitzer:FixDynamicFilterEmptyMerge?@Reo-LEI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-842471648


   Thanks, @RussellSpitzer! Thanks for reviewing, @kbendick!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-842471467






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-840569396


   > > > > Thank you very much,I have another question, can you help me answer it:When I'm merge into operation on the same table ,appear error。can i Obtain the state of the table to determine?thanks
   > > > 
   > > > 
   > > > I'm not sure what are you asking? Are you having a conflict error where it says the merge operation is violating the constraint of another opertaion?
   > > 
   > > 
   > > Thank you for your reply ,Yes Yes, that's the kind of mistake,Is there a better solution to this?Or I can get the merge operation state
   > 
   > this is Specific error:
   > 2021-05-13 16:16:50java.sql.SQLException: Error running query: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [alluxio://alluxio-master-0.default.svc.cluster.local:19998/cdp.db/dim_cust_user_mid/data/00000-4074
   
   It would probably best for you to ask about this on the mailing list, but the issue here is that another operation committed while the Merge was being created. This invalidated the merge that had been created causing it to fail.
   
   A trivial example
   
   User A changes Row (1, 2) to (2, 2)
   User B changes Row (1, 2) to (4, 2)
   
   If both users attempt to do this at the same time one must fail, since both operations cannot both have happened. The validation step in iceberg automatically causes whoever finishes second to fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-841936884






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 221770490011111 commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
221770490011111 commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-840402067


   > > > Thank you very much,I have another question, can you help me answer it:When I'm merge into operation on the same table ,appear error。can i Obtain the state of the table to determine?thanks
   > > 
   > > 
   > > I'm not sure what are you asking? Are you having a conflict error where it says the merge operation is violating the constraint of another opertaion?
   > 
   > Thank you for your reply ,Yes Yes, that's the kind of mistake,Is there a better solution to this?Or I can get the merge operation state
   
   this is  Specific error:
   2021-05-13 16:16:50java.sql.SQLException: Error running query: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [alluxio://alluxio-master-0.default.svc.cluster.local:19998/cdp.db/dim_cust_user_mid/data/00000-4074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-839837544


   > Thank you very much,I have another question, can you help me answer it:When I'm merge into operation on the same table ,appear error。can i Obtain the state of the table to determine?thanks
   
   I'm not sure what are you asking? Are you having a conflict error where it says the merge operation is violating the constraint of another opertaion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-841151547


   > @Reo-LEI @221770490011111 Could you please take a look?
   
   This patch can pass my test case and run correctly on product env. I think this problem is resolved. Thanks for your fix! @RussellSpitzer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r631617870



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the output partitioning of the node. Currently this can only occur in the SinglePartition distribution is
+  in use which only happens if both the target and source have a single partition, but if it does we have the potential
+  of eliminating the only partition in the target. If there are no partitions in the target then we will throw an
+  exception because the partitioning was assumed to be the same 1 partition in source and target. We fix this by making
+  sure that we always return at least 1 empty partition, in the future we may need to handle more complicated
+  partitioner scenarios.
+   */

Review comment:
       Nit: Should you use either a `/** ... */` or just a bunch of `//` for the comments?
   
   I don't normally see a block comment in the iceberg repo with `/* .... */` like this. If this is a standard practice in the repo, then ignore my comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r632774002



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the output partitioning of the node. Currently this can only occur in the SinglePartition distribution is
+  in use which only happens if both the target and source have a single partition, but if it does we have the potential
+  of eliminating the only partition in the target. If there are no partitions in the target then we will throw an
+  exception because the partitioning was assumed to be the same 1 partition in source and target. We fix this by making
+  sure that we always return at least 1 empty partition, in the future we may need to handle more complicated
+  partitioner scenarios.
+   */

Review comment:
       I don't think this counts as a java doc, it's just an internal implementation note. Mostly to remind me to be wary when we change this code path in the future




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r631245340



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       But now I have doubts! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-842471467


   This has been open for a while. I think it is a safe solution for now but we should reconsider our dynamic filters in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-839294873


   Fixes #2533 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-841939131


   > > > @Reo-LEI @221770490011111 Could you please take a look?
   > > 
   > > 
   > > This patch can pass my test case and run correctly on product env. I think this problem is resolved. Thanks for your fix! @RussellSpitzer
   > 
   > Awesome!! Are you using this branch RussellSpitzer:FixDynamicFilterEmptyMerge?@Reo-LEI
   
   Nope, I maintain an internal branch, and merge this PR into my branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-839296625


   @Reo-LEI @221770490011111  Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r630682483



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       Alternatively, we could remember the original output partitioning in dynamic filter and then do that check. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#issuecomment-841939131


   > > > @Reo-LEI @221770490011111 Could you please take a look?
   > > 
   > > 
   > > This patch can pass my test case and run correctly on product env. I think this problem is resolved. Thanks for your fix! @RussellSpitzer
   > 
   > Awesome!! Are you using this branch RussellSpitzer:FixDynamicFilterEmptyMerge?@Reo-LEI
   
   Nope, I maintain an internal branch, and merge this PR into my branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org