You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/18 03:53:32 UTC

[GitHub] [iceberg] singhpk234 opened a new pull request, #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWriter

singhpk234 opened a new pull request, #4800:
URL: https://github.com/apache/iceberg/pull/4800

   Extend the handling for https://github.com/apache/iceberg/pull/4687 for SparkPositionDeltaWriter used in MOR.
   
   TODO : Add an UT
   
   ---
   
   cc @RussellSpitzer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r878194963


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -224,7 +227,7 @@ private Expression conflictDetectionFilter(SparkBatchQueryScan queryScan) {
     @Override
     public void abort(WriterCommitMessage[] messages) {
       for (WriterCommitMessage message : messages) {
-        if (message != null) {
+        if (message != null && cleanupOnAbort) {

Review Comment:
   minor nit here, but we can but this outside the loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r877647813


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java:
##########
@@ -43,4 +49,8 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
 
     return new SparkTable(table, false);
   }
+
+  public static void setDummyIcebergTbl(Table dummyTbl) {

Review Comment:
   +1, Agree with you, present impl allows me to set only 1 tbl per UT, having a map and doing a look-up can allow me creating more than 1 tbl at given time :) !!!
   
   Made the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r877647813


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java:
##########
@@ -43,4 +49,8 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
 
     return new SparkTable(table, false);
   }
+
+  public static void setDummyIcebergTbl(Table dummyTbl) {

Review Comment:
   +1, Agree with you, present impl allows me to set only 1 tbl per UT, having a map and doing a look-up can allow me creating more than 1 tbl pre UT at a given time :) !!!
   
   Made the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r877558934


##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java:
##########
@@ -37,4 +52,51 @@ protected Map<String, String> extraTableProperties() {
         TableProperties.DELETE_MODE, "merge-on-read"
     );
   }
+
+
+  @Test
+  public void testCommitUnknownException() throws IOException, NoSuchTableException {
+    createAndInitTable("id INT, dep STRING, category STRING");
+
+    // write an unpartitioned file
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\", \"category\": \"c1\"}");
+    append(tableName, "{ \"id\": 2, \"dep\": \"hr\", \"category\": \"c1\" }\n" +
+        "{ \"id\": 3, \"dep\": \"hr\", \"category\": \"c1\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    RowDelta newRowDelta = table.newRowDelta();
+    RowDelta spyNewRowDelta = spy(newRowDelta);
+    doAnswer(invocation -> {
+      newRowDelta.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyNewRowDelta).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newRowDelta()).thenReturn(spyNewRowDelta);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+

Review Comment:
   nit: double space



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #4800:
URL: https://github.com/apache/iceberg/pull/4800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#issuecomment-1132120133

   @singhpk234 Thats a great solution! Maybe i'l rework my PR later to do the same thing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r877560914


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java:
##########
@@ -43,4 +49,8 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
 
     return new SparkTable(table, false);
   }
+
+  public static void setDummyIcebergTbl(Table dummyTbl) {

Review Comment:
   Probably not needed, but you could do the similar map of "Identifier -> Table" and just allow our tests to just insert new tables into that map and then request them on demand. Then have a method clear it out after each test. Just thinking about this for tests which run in multiple parameterizations (which will have the same static dummyIcebergTbl)
   
   I know i'm biased since this is how I did it my other example, I like your placement in the Catalog here much more than mine though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#issuecomment-1140521292

   Thanks, @singhpk234!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r877622612


##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java:
##########
@@ -37,4 +52,51 @@ protected Map<String, String> extraTableProperties() {
         TableProperties.DELETE_MODE, "merge-on-read"
     );
   }
+
+

Review Comment:
   Nit: double space



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#issuecomment-1130559838

   @singhpk234 you probably saw how I wrote the test in the other suite, it is not pretty but I think you maybe be able to pull something off like that as well. Although this probably won't be easy unless it is coupled with a Spark version that natively supports the merge operations (3.3 I believe @aokolnychyi has it merged in) since at the moment we would need custom plan rules as well I think ...
   
    I'm getting close to thinking we need byteman or some other bytecode fault injector :(.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#issuecomment-1131574206

   >since at the moment we would need custom plan rules as well I think
   
   +1, I was also thinking in these lines, we need here analyzer rules so that we could get a DSV2Relation which hold our mocked table object (which would throw our CommitUnknowEx when commit is called). The main problem as you highlighted is the same, it supported in SQL via our Extensions :(.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#issuecomment-1132117821

   @RussellSpitzer , I went one step above and used catalog to manipulate and return the spy table I want. Used static prop to manipulate to get the spy spark table instead :) !!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4800: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4800:
URL: https://github.com/apache/iceberg/pull/4800#discussion_r878194963


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -224,7 +227,7 @@ private Expression conflictDetectionFilter(SparkBatchQueryScan queryScan) {
     @Override
     public void abort(WriterCommitMessage[] messages) {
       for (WriterCommitMessage message : messages) {
-        if (message != null) {
+        if (message != null && cleanupOnAbort) {

Review Comment:
   minor nit here, but we can test this outside the loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org