You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "ConeyLiu (via GitHub)" <gi...@apache.org> on 2023/06/28 10:56:22 UTC

[GitHub] [iceberg] ConeyLiu opened a new pull request, #7933: Core: Abort file groups should be under same lock as committerService

ConeyLiu opened a new pull request, #7933:
URL: https://github.com/apache/iceberg/pull/7933

   We have met a very corner case that the rewrite job was aborted due to the timeout waiting. And some of the committed files are deleted meanwhile. The problem here is that the aborting thread (main thread) and the committerService thread could operate the `completedRewrites` concurrently. So here we lock the main thread with the same object as committerService when aborting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1258662864


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   > However, the committerService is not stopped and it still could commit the file groups (5 to 9).
   
   Is it true?  I dont see the group 5-9 will ever proceed because we always return false for canCreateCommitGroup().  (also ran the test to check).  Am I missing something?
   
   I thought race condition is between commit or aborting 0-4 group.
   
   If that is true, how about adding these comments?
   
   
   ```
   // Add file groups 0-4 for commit.
   // There are less than the rewritesPerCommit, and thus commitService will not commit until next batch added.
   // During commit, these wiil sleep a fixed duration, to test race condition with close abort (below)
   for (int i = 0; i < 4; i++) {
       commitService.offer(i);
    }
   
   // Add file groups 5-9 for commit
   // These are gated to not be able to commit, 
   // and only serve to allow file 0-4 to proceed
   CustomCommitService spyCommitService = spy(commitService);
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   for (int i = 4; i < 8; i++) {
      spyCommitService.offer(i);
   }
   
   // close commitService.  
   // This will wait a fixed duration to abort file group 0-4
   // testing race condition as they are attempt to finish commit (above)
   Assertions.assertThatThrownBy(commitService::close)
           .isInstanceOf(IllegalArgumentException.class)
           .hasMessageContaining("Timeout occurred when waiting for commits");
   
   // Wait for the commitService to finish trying to commit file groups
   Awaitility.await()
       .atMost(5, TimeUnit.SECONDS)
           .pollInSameThread()
           .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
   
   
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   
   Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
   ```
   
   Can you check if comments and my understanding is right?  
   
   I also feel the commitService results will not always have 0-4 then?  But rather it should either be aborted or committed.



##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    int numberOfFileGroups = 100;
+    Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the last group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the committerService finish commit the remaining file groups

Review Comment:
   Typo, also suggested the following comment (see my other review comment for context)
   
   // Wait for the commitService to finish trying to commit file groups
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261244010


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   @szehon-ho I updated the comments, please take a look.
   
   ```
   // Add file groups [0, 4) for commit.
   // There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those file groups
   // will be added to the completedRewrites queue.
   // Now the queue has 4 file groups that need to commit.
   for (int i = 0; i < 4; i++) {
       commitService.offer(i);
    }
   
   // Add file groups [4, 8) for commit
   // These are gated to not be able to commit, so all those 4 file groups will be added to the queue as well.
   // Now the queue has 8 file groups that need to commit.
   CustomCommitService spyCommitService = spy(commitService);
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   for (int i = 4; i < 8; i++) {
      spyCommitService.offer(i);
   }
   
   // close commitService. 
   // The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
   // that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
   // the time cost exceeding the timeout. 
   // Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
   // committerService thread (trying to commit file groups[6, 8))
   Assertions.assertThatThrownBy(commitService::close)
           .isInstanceOf(IllegalArgumentException.class)
           .hasMessageContaining("Timeout occurred when waiting for commits");
   
   // Wait for the commitService to finish. Committed all file groups or abort remaining file groups. 
   Awaitility.await()
       .atMost(5, TimeUnit.SECONDS)
           .pollInSameThread()
           .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
   
   
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   
   Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
   ```
   > I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.
   
   [0, 4] should be committed success, however, the [5, 8] may be aborted or committed. I think we need to keep the following:
   ```
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   ```
   This indicates the committed and aborted should not be overlapped.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1249615758


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the committerService finish commit the remaining file groups
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInSameThread()
+        .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
+    Assertions.assertThat(commitService.results())
+        .doesNotContainAnyElementsOf(commitService.aborted);

Review Comment:
   Besides this, do we need to check which groups are committed? I see the commit sleeps 210ms, does that means all groups will be aborted or four of them will be committed successfully? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1256591419


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   So it took me awhile to get this.  This is to make it go above  the rewritesPerCommit limit , allowing the first batch (0-4) to finally try commit right ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1254351587


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the committerService finish commit the remaining file groups
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInSameThread()
+        .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
+    Assertions.assertThat(commitService.results())
+        .doesNotContainAnyElementsOf(commitService.aborted);

Review Comment:
   Add the checking. The last 3 should be aborted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1248834540


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -64,17 +68,30 @@ abstract class BaseCommitService<T> implements Closeable {
    * @param rewritesPerCommit number of file groups to include in a commit
    */
   BaseCommitService(Table table, int rewritesPerCommit) {
+    this(table, rewritesPerCommit, TIMEOUT_IN_MS_DEFAULT);
+  }
+
+  /**
+   * Constructs a {@link BaseCommitService}
+   *
+   * @param table table to perform commit on
+   * @param rewritesPerCommit number of file groups to include in a commit
+   * @param timeoutInMS The timeout to wait for commits to complete after all rewrite jobs have been
+   *     completed
+   */
+  BaseCommitService(Table table, int rewritesPerCommit, long timeoutInMS) {
     this.table = table;
     LOG.info(
         "Creating commit service for table {} with {} groups per commit", table, rewritesPerCommit);
     this.rewritesPerCommit = rewritesPerCommit;
+    this.timeoutInMS = timeoutInMS;
 
     committerService =
         Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
 
     completedRewrites = Queues.newConcurrentLinkedQueue();
-    committedRewrites = Lists.newArrayList();
+    committedRewrites = Queues.newConcurrentLinkedQueue();

Review Comment:
   `committedRewrites` could be updated concurrently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1629973954

   @RussellSpitzer @szehon-ho @ConeyLiu Can we add a doc like README and some flowchart to describe the commit logic? That may help later optimizations and reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1635175421

   Thanks @szehon-ho for merging this and thanks @RussellSpitzer @chenjunjiedada @stevenzwu @yananli-ebay for reviewing.
   
   PR for 1.3.x submitted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yananli-ebay commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "yananli-ebay (via GitHub)" <gi...@apache.org>.
yananli-ebay commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1251214244


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -154,7 +171,7 @@ public void close() {
       // the commit pool to finish doing its commits to Iceberg State. In the case of partial
       // progress this should have been occurring simultaneously with rewrites, if not there should
       // be only a single commit operation.
-      if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
+      if (!committerService.awaitTermination(timeoutInMS, TimeUnit.MILLISECONDS)) {

Review Comment:
   Can we also update warning with timeoutInMS? In case the timeoutInMS is changed to other customized num.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1631216990

   Hm.. would it make more sense as a code comment rather than separate README file?  
   
   @ConeyLiu can you check if my understanding is right , for the test case, and make those changes?
   
   Wanted to get this in to 1.3.1, and maybe we can do some follow up for docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1262892827


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    int numberOfFileGroups = 100;
+    Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add file groups [0-3] for commit.
+    // There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those
+    // file groups will be added to the completedRewrites queue.
+    // Now the queue has 4 file groups that need to commit.
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Add file groups [4-7] for commit
+    // These are gated to not be able to commit, so all those 4 file groups will be added to the
+    // queue as well.
+    // Now the queue has 8 file groups that need to commit.
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    // close commitService.
+    // This allows committerService thread to start to commit the remaining file groups [0-7] in the
+    // completedRewrites queue. And also the main thread waits for the committerService thread to
+    // finish within a timeout.
+
+    // The committerService thread commits file groups [0-4]. These will wait a fixed duration to

Review Comment:
   Sorry this is my fault, can we fix the groups?  I guess the group is 0-3, and 4-7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1611222254

   Hi, @szehon-ho @rdblue @aokolnychyi @RussellSpitzer could you help to review this when you are free? Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1246002428


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -169,7 +169,11 @@ public void close() {
 
     if (!completedRewrites.isEmpty() && timeout) {
       LOG.error("Attempting to cleanup uncommitted file groups");
-      completedRewrites.forEach(this::abortFileGroup);
+      synchronized (completedRewrites) {
+        while (!completedRewrites.isEmpty()) {

Review Comment:
   @stevenzwu, we need to poll the `completedRewrites` to empty, otherwise the `committerService ` still could commit them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1256591419


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   So it took me awhile to get this.  This is to make it go above  the rewritesPerCommit limit , allowing the first batch to finally try commit right ? 



##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the committerService finish commit the remaining file groups
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInSameThread()
+        .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
+    Assertions.assertThat(commitService.results())
+        .doesNotContainAnyElementsOf(commitService.aborted);

Review Comment:
   Also a bit lost, what purpose does the timeout serve?  I thought the first 4 commits are blocked by rewritesPerCommit.



##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();

Review Comment:
   Is it possible to make a CustomCommitServicde implement canCreateCommitGroup() with a wait/notify.  (instead of using the rewritesPerCommit limit to trigger it after close, which is a bit hard to grasp).
   
   Something like:
   ```
   CustomCommitSerivce {
     private boolean canCreateCommitGroup() {
        object.wait();
     }
   }
   ...
   for (int i = 0; i < 4; i++) {
      service.submit(commitService.offer());
   }
   commitService.close();
   object.notify();
   ```
   
   I am not sure if that is what you are looking for, though, as its a bit hard for me to follow the test thought.
   
   



##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);

Review Comment:
   Nit: make 100 a variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261248427


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    int numberOfFileGroups = 100;
+    Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the last group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the committerService finish commit the remaining file groups

Review Comment:
   Will update together



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261244010


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   @szehon-ho I updated the comments, please take a look.
   
   ```java
   // Add file groups [0, 4) for commit.
   // There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those file groups
   // will be added to the completedRewrites queue.
   // Now the queue has 4 file groups that need to commit.
   for (int i = 0; i < 4; i++) {
       commitService.offer(i);
    }
   
   // Add file groups [4, 8) for commit
   // These are gated to not be able to commit, so all those 4 file groups will be added to the queue as well.
   // Now the queue has 8 file groups that need to commit.
   CustomCommitService spyCommitService = spy(commitService);
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   for (int i = 4; i < 8; i++) {
      spyCommitService.offer(i);
   }
   
   // close commitService. 
   // The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
   // that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
   // the time cost exceeding the timeout. 
   // Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
   // committerService thread (trying to commit file groups[6, 8))
   Assertions.assertThatThrownBy(commitService::close)
           .isInstanceOf(IllegalArgumentException.class)
           .hasMessageContaining("Timeout occurred when waiting for commits");
   
   // Wait for the commitService to finish. Committed all file groups or abort remaining file groups. 
   Awaitility.await()
       .atMost(5, TimeUnit.SECONDS)
           .pollInSameThread()
           .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
   
   
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   
   Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
   ```
   > I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.
   
   [0, 4] should be committed success, however, the [5, 8] may be aborted or committed. I think we need to keep the following:
   ```java
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   ```
   This indicates the committed and aborted should not be overlapped.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1263050142


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    int numberOfFileGroups = 100;
+    Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add file groups [0-3] for commit.
+    // There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those
+    // file groups will be added to the completedRewrites queue.
+    // Now the queue has 4 file groups that need to commit.
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Add file groups [4-7] for commit
+    // These are gated to not be able to commit, so all those 4 file groups will be added to the
+    // queue as well.
+    // Now the queue has 8 file groups that need to commit.
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    // close commitService.
+    // This allows committerService thread to start to commit the remaining file groups [0-7] in the
+    // completedRewrites queue. And also the main thread waits for the committerService thread to
+    // finish within a timeout.
+
+    // The committerService thread commits file groups [0-4]. These will wait a fixed duration to

Review Comment:
   Sorry yea, long day :)   Saw the 0-3 up top and thought its the same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1615223692

   Ideally I think we should have a test for this edge case, is there any chance we can get one of those added in?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1248836480


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -169,7 +169,11 @@ public void close() {
 
     if (!completedRewrites.isEmpty() && timeout) {
       LOG.error("Attempting to cleanup uncommitted file groups");
-      completedRewrites.forEach(this::abortFileGroup);
+      synchronized (completedRewrites) {
+        while (!completedRewrites.isEmpty()) {

Review Comment:
   @RussellSpitzer that's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261944672


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   > but I guess its just to prevent the main thread from commiting immediately. 
   
   Actually this is prevent the committing from the rewrite thread pool.
   
   > and committerService thread can commit whenever the condition is triggered as its not using the spy?
   
   That's right.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho merged pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho merged PR #7933:
URL: https://github.com/apache/iceberg/pull/7933


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1611221031

   Here, the committerService start to commit after all the rewrite job is finished. I am not sure whether we should enable it to commit from the rewrite job starting. Because this could increase the conflicts with the rewrite job commit request.https://github.com/apache/iceberg/blob/51eaf6806361e6e0a5cd163071dce684ec05350b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java#L115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1245941220


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -169,7 +169,11 @@ public void close() {
 
     if (!completedRewrites.isEmpty() && timeout) {
       LOG.error("Attempting to cleanup uncommitted file groups");
-      completedRewrites.forEach(this::abortFileGroup);
+      synchronized (completedRewrites) {
+        while (!completedRewrites.isEmpty()) {

Review Comment:
   the `synchronized` looks good to me. 
   
   regarding the while loop, curious why not stay with the original code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1249606989


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {

Review Comment:
   Nice UT!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1260515180


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   nit: I think the original logic only has 8 groups in total.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261244010


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   @szehon-ho I updated the comments, please take a look.
   
   ```java
   // Add file groups [0, 4) for commit.
   // There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those file groups
   // will be added to the completedRewrites queue.
   // Now the queue has 4 file groups that need to commit.
   for (int i = 0; i < 4; i++) {
       commitService.offer(i);
    }
   
   // Add file groups [4, 8) for commit
   // These are gated to not be able to commit, so all those 4 file groups will be added to the queue as well.
   // Now the queue has 8 file groups that need to commit.
   CustomCommitService spyCommitService = spy(commitService);
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   for (int i = 4; i < 8; i++) {
      spyCommitService.offer(i);
   }
   
   // close commitService. 
   // The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
   // that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
   // the time cost exceeding the timeout. 
   // Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
   // committerService thread (trying to commit file groups[6, 8))
   Assertions.assertThatThrownBy(commitService::close)
           .isInstanceOf(IllegalArgumentException.class)
           .hasMessageContaining("Timeout occurred when waiting for commits");
   
   // Wait for the commitService to finish. Committed all file groups or abort remaining file groups. 
   Awaitility.await()
       .atMost(5, TimeUnit.SECONDS)
           .pollInSameThread()
           .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
   
   
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   
   Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
   ```
   > I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.
   
   [0, 4] should be committed success, however, the [5, 8] may be aborted or committed. I think we need to keep the following:
   ```
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   ```
   This indicates the committed and aborted should not be overlapped.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261668243


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   Suggestion to make it a bit shorter.
   
   ```
   // close commitService. 
   // The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
   // that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
   // the time cost exceeding the timeout. 
   // Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
   // committerService thread (trying to commit file groups[6, 8))
   ```
   =>
   ```
   // close commitService. 
   // This allows committerService thread to starts to commit the remaining file groups [0-8] in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. 
   
   // The committerService thread commits file groups [0, 5].  These will wait a fixed duration to 
   // simulate timeout on the main thread, which then tries to abort file groups [6-8].
   // This tests the race conditions, as the commiterService is also trying to commit groups [6-8].
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1633133064

   Thanks  @ConeyLiu for patiently answering my questions, the concurrent test a bit difficult to follow for me :).  I'm ok to do the comment you put, I just put one more suggestion.  I'm also working on another pending 1.3.1 issue so we should be ok.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1263026457


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    int numberOfFileGroups = 100;
+    Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add file groups [0-3] for commit.
+    // There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those
+    // file groups will be added to the completedRewrites queue.
+    // Now the queue has 4 file groups that need to commit.
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Add file groups [4-7] for commit
+    // These are gated to not be able to commit, so all those 4 file groups will be added to the
+    // queue as well.
+    // Now the queue has 8 file groups that need to commit.
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    // close commitService.
+    // This allows committerService thread to start to commit the remaining file groups [0-7] in the
+    // completedRewrites queue. And also the main thread waits for the committerService thread to
+    // finish within a timeout.
+
+    // The committerService thread commits file groups [0-4]. These will wait a fixed duration to

Review Comment:
   It should be [0-4] because the `rewritesPerCommit` is 5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1257209676


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   Yes, the problem could only be possible to trigger when there are more than `rewritesPerCommit ` in the queue that needs to commit by the `committerService`.  For example, we set the `rewritesPerCommit` to 5. And there are 10 file groups that need to commit by `committerService`.  The timeout occurs in the first commit (range from 0 to 4). Then main thread deletes the remaining 5 file groups (5 to 9). However, the `committerService` is not stopped and it still could commit the file groups (5 to 9).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1635052735

   Merged, thanks @ConeyLiu , and all for the reviews!
   
   @ConeyLiu can you make a pr to merge to 1.3.1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1623853592

   I submit a follow-up #8001 to address the commit failed by concurrent commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1258662864


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   > However, the committerService is not stopped and it still could commit the file groups (5 to 9).
   
   Is it true?  I dont see the group 5-9 will ever proceed because we always return false for canCreateCommitGroup().  (also ran the test to check).  Am I missing something?
   
   I thought race condition is between commit or aborting 0-4 group.
   
   If that is true, how about adding these comments?
   
   
   ```
   // Add file groups 0-4 for commit.
   // There are less than the rewritesPerCommit, and thus commitService will not commit until next batch added.
   // During commit, these wiil sleep a fixed duration, to test race condition with close abort (below)
   for (int i = 0; i < 4; i++) {
       commitService.offer(i);
    }
   
   // Add file groups 5-9 for commit
   // These are gated to not be able to commit, 
   // and only serve to allow file 0-4 to proceed
   CustomCommitService spyCommitService = spy(commitService);
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   for (int i = 4; i < 8; i++) {
      spyCommitService.offer(i);
   }
   
   // close commitService.  
   // This will wait a fixed duration to abort file group 0-4
   // testing race condition as they are attempt to finish commit (above)
   Assertions.assertThatThrownBy(commitService::close)
           .isInstanceOf(IllegalArgumentException.class)
           .hasMessageContaining("Timeout occurred when waiting for commits");
   
   // Wait for the commitService to finish trying to commit file groups
   Awaitility.await()
       .atMost(5, TimeUnit.SECONDS)
           .pollInSameThread()
           .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
   
   
   Assertions.assertThat(commitService.results())
         .doesNotContainAnyElementsOf(commitService.aborted);
   
   Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
   ```
   
   Can you check if comments and my understanding is right?  
   
   I also feel the commitService results will not always have 0-4 then?  But rather we should change the check to if they are either aborted or committed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1633501777

   Thanks @szehon-ho for the time to do the details review. Comments have been added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1631739930

   Thanks @szehon-ho for the detailed review, will address the comments today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261668243


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   Suggestion to make it a bit shorter.
   
   ```
   // close commitService. 
   // The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
   // that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
   // the time cost exceeding the timeout. 
   // Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
   // committerService thread (trying to commit file groups[6, 8))
   ```
   =>
   ```
   // close commitService. 
   // This allows committerService thread to starts to commit the remaining file groups [0-8] in the completedRewrites queue.
   // And also the main thread waits for the committerService thread to finish within a timeout. 
   
   // The committerService thread commits file groups [0, 5].  These will wait a fixed duration to simulate timeout on the main thread, which then tries to abort file groups [6-8].
   // This tests the race conditions, as the commiterService is also trying to commit groups [6-8].
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261654306


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   Thanks, it makes the test easier to follow now.   Now i get, that when close, there's an extra condition in the main commitService loop to commit those.
   
   To me,  this is the most confusing part.
   ```
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   ```
   I thought this would prevent group 4-8 from ever proceeding, but I guess its just to prevent the main thread from commiting immediately.  It still gets on the committedRewrites list, and committerService thread can commit whenever the condition is triggered as its not using the spy?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261654306


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   Thanks, it makes the test easier to follow now.   Now i get, that when close, there's an extra condition in the main commitService loop to commit those.
   
   To me,  this is the most confusing part.
   ```
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   ```
   I thought this would prevent group 4-8 from ever proceeding, but I guess its just to prevent the main thread from commiting immediately.  It still gets on the committedRewrites list, and background thread can commit whenever the condition is triggered as its not using the spy?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1249606646


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();

Review Comment:
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1256605582


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();

Review Comment:
   Is it possible to make a CustomCommitServicde implement canCreateCommitGroup() with a wait/notify.  (instead of using the rewritesPerCommit limit to trigger it as the same time as close, which is a bit hard to grasp).
   
   Something like:
   ```
   CustomCommitSerivce {
     private boolean canCreateCommitGroup() {
        object.wait();
     }
   }
   ...
   for (int i = 0; i < 4; i++) {
      service.submit(commitService.offer());
   }
   commitService.close();
   object.notify();
   ```
   
   I am not sure if that is what you are looking for, though, as its a bit hard for me to follow the test thought.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1248305559


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -169,7 +169,11 @@ public void close() {
 
     if (!completedRewrites.isEmpty() && timeout) {
       LOG.error("Attempting to cleanup uncommitted file groups");
-      completedRewrites.forEach(this::abortFileGroup);
+      synchronized (completedRewrites) {
+        while (!completedRewrites.isEmpty()) {

Review Comment:
   The issue is we could leave the synchronized block and then touch completedRewrites? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1254351771


##########
core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java:
##########
@@ -154,7 +171,7 @@ public void close() {
       // the commit pool to finish doing its commits to Iceberg State. In the case of partial
       // progress this should have been occurring simultaneously with rewrites, if not there should
       // be only a single commit operation.
-      if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
+      if (!committerService.awaitTermination(timeoutInMS, TimeUnit.MILLISECONDS)) {

Review Comment:
   Updated the warning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1257211519


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the committerService finish commit the remaining file groups
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInSameThread()
+        .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
+    Assertions.assertThat(commitService.results())
+        .doesNotContainAnyElementsOf(commitService.aborted);

Review Comment:
   The main thread will clean up uncommitted file groups after the timeout. Then the main thread and `committerService` could both tough the `completedRewrites`. And that's the root problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1257210705


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();

Review Comment:
   I tried `CountDownLatch` and similar, but it is hard to trigger the problems. That's why I used the spy here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1257204238


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1632609309

   Thanks @szehon-ho @chenjunjiedada I comment in the comments thread. Please take a look. I will update to code if it is OK.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1261654306


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {

Review Comment:
   Thanks, it makes the test easier to follow now.   Now i get, that when close, there's an extra condition in the main commitService loop to commit those.
   
   To me,  this is the most confusing part.
   ```
   doReturn(false).when(spyCommitService).canCreateCommitGroup();
   ```
   I thought this would prevent group 4-8 from ever proceeding, but I guess its just to prevent the main thread from commiting immediately.  It still gets on the list, and background thread can commit whenever the condition is triggered?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "chenjunjiedada (via GitHub)" <gi...@apache.org>.
chenjunjiedada commented on PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#issuecomment-1615566949

   @ConeyLiu @RussellSpitzer +1 to add a unit test to guard the logic. I think we could make the timeout threshold (120 minutes right now) configurable so that it should be a bit easy to reproduce the problem unit test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7933: Core: Abort file groups should be under same lock as committerService

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7933:
URL: https://github.com/apache/iceberg/pull/7933#discussion_r1248854888


##########
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Tasks.range(100).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add the number of less than rewritesPerCommit
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Simulate the latest group of rewrite
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();

Review Comment:
   @RussellSpitzer @chenjunjiedada I have to mock the behavior. It is really difficult to simulate otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org