You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/29 14:17:03 UTC

[GitHub] [beam] rvballada opened a new pull request, #22932: Io jms fix ack message checkpoint

rvballada opened a new pull request, #22932:
URL: https://github.com/apache/beam/pull/22932

   Hi @lukecwik would you kindly review this PR related to the following issue:
   https://github.com/apache/beam/issues/20814
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ X] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ X] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ X] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22932:
URL: https://github.com/apache/beam/pull/22932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1241903789

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @apilloud for label java.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r983013968


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   Changing the fields stored in the object will change its serialVersionUID and make it incompatible with pipeline update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r989858210


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +58,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {

Review Comment:
   I got back to the previous solution, so it is not relevant anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r984698349


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +58,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {

Review Comment:
   I am going to get back to the initial JmsChekpointmark, it is simple and avoid dependencies between the checkpoint and the reader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r982349901


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   yes readerHash = System.identityHashCode(reader) it is computed one time in the constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r998678381


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -21,9 +21,14 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;

Review Comment:
   ```suggestion
   import static org.mockito.Mockito.mock;
   import static org.mockito.Mockito.times;
   import static org.mockito.Mockito.verify;
   import static org.mockito.Mockito.verifyNoMoreInteractions;
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -67,9 +75,12 @@
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;

Review Comment:
   ```suggestion
   import org.junit.Test;
   import org.junit.runner.RunWith;
   import org.junit.runners.JUnit4;
   import org.mockito.ArgumentCaptor;
   import org.mockito.Mockito;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r989857252


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
   static class UnboundedJmsReader<T> extends UnboundedReader<T> {
 
     private UnboundedJmsSource<T> source;
-    private JmsCheckpointMark checkpointMark;
     private Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private AutoScaler autoScaler;
 
     private T currentMessage;
+    private Message currentJmsMessage;
     private Instant currentTimestamp;
 
-    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
+    Set<Message> messagesToAck;

Review Comment:
   Yes it will work as the previous solution did



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1257941026

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kileys for label java.
   R: @pabloem for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1283662309

   Thanks @lukecwik !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1278148211

   Sorry for the long wait, lots of other issues blocked me from reviewing this. Taking a follow-up look now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r995438872


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +608,85 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+      if (active.get()) {
+        try {
+          closeAutoscaler();
+          closeConsumer();
+          ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r995439079


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {

Review Comment:
   Yes of course



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r983096343


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   Ok, I think I will get back to the old JmsCheckpointMark implementation, and only add the delay to close the session and the possibility to discard the checkpoint after the delay



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   Ok, I think I will get back to the old JmsCheckpointMark implementation, and only add the delay to close the session and the possibility to discard the checkpoint after the delay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r989857957


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;

Review Comment:
   I got back to the previous solution, so it is not relevant anymore



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   I got back to the previous solution, so it is not relevant anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r998732722


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +565,106 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout() throws IOException {
+    Duration closeTimeout = Duration.millis(2000L);
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+    ScheduledExecutorService mockScheduledExecutorService =
+        Mockito.mock(ScheduledExecutorService.class);
+    ExecutorOptions options = PipelineOptionsFactory.as(ExecutorOptions.class);
+    options.setScheduledExecutorService(mockScheduledExecutorService);
+    ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+    when(mockScheduledExecutorService.schedule(
+            runnableArgumentCaptor.capture(), anyLong(), any(TimeUnit.class)))
+        .thenReturn(null /* unused */);
+
+    JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+     reader.start();

Review Comment:
   ```suggestion
       reader.start();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r996957968


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+
       try {
-        if (consumer != null) {
-          consumer.close();
-          consumer = null;
+        closeAutoscaler();
+        closeConsumer();
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   How do we get the executor service from ExecutorOptions? As far as I can see, PipelineOptions should be then a reader field, but how do we get the ExecutorService from PipelineOptions?



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();

Review Comment:
   How do we get the executor service from ExecutorOptions? As far as I can see, PipelineOptions should be then a reader field, but how do we get the ExecutorService from PipelineOptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r977555690


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
   static class UnboundedJmsReader<T> extends UnboundedReader<T> {
 
     private UnboundedJmsSource<T> source;
-    private JmsCheckpointMark checkpointMark;
     private Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private AutoScaler autoScaler;
 
     private T currentMessage;
+    private Message currentJmsMessage;
     private Instant currentTimestamp;
 
-    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
+    Set<Message> messagesToAck;

Review Comment:
   Hello, 
   The messagesToAck set is cleared every time we checkpoint (in the getCheckpointmark method).
   Before that the same set was maintained in the JmsCheckpointmark. I don't think maintaining this is memory intensive.
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r995107311


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+
       try {
-        if (consumer != null) {
-          consumer.close();
-          consumer = null;
+        closeAutoscaler();
+        closeConsumer();
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Please use the scheduled executor service from ExecutorOptions once it is merged from https://github.com/apache/beam/pull/23234



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {

Review Comment:
   Can we update this to use a duration object?
   
   ```suggestion
       public Read<T> withCloseTimeout(Duration closeTimeout) {
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+
       try {
-        if (consumer != null) {
-          consumer.close();
-          consumer = null;
+        closeAutoscaler();
+        closeConsumer();
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        executorService.schedule(
+            () -> {
+              LOG.debug(
+                  "Closing session and connection after delay {}", source.spec.getCloseTimeout());
+              // Discard the checkpoints and set the reader as inactive
+              checkpointMark.discard();
+              closeSession();
+              closeConnection();
+            },
+            source.spec.getCloseTimeout(),
+            TimeUnit.MILLISECONDS);

Review Comment:
   note to self, this should match the unit on the closeTimeout duration object



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {
+      return builder().setCloseTimeout(closeTimeout).build();

Review Comment:
   ```suggestion
         if (closeTimeout.isNegative()) { throw new IllegalArgumentException("Close timeout must be non-negative."); }
         return builder().setCloseTimeout(closeTimeout).build();
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -76,15 +86,17 @@ Instant getOldestMessageTimestamp() {
   public void finalizeCheckpoint() {
     lock.writeLock().lock();
     try {
-      for (Message message : messages) {
-        try {
-          message.acknowledge();
-          Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
+      if (!discarded) {
+        for (Message message : messages) {
+          try {
+            message.acknowledge();
+            Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
+            if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
+              oldestMessageTimestamp = currentMessageTimestamp;
+            }
+          } catch (Exception e) {
+            LOG.error("Exception while finalizing message: ", e);

Review Comment:
   consider using a guard statement instead:
   
   ```suggestion
         if (discarded) {
           messages.clear();
           return;
         }
         for (Message message : messages) {
           try {
             message.acknowledge();
             Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
             if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
               oldestMessageTimestamp = currentMessageTimestamp;
             }
           } catch (Exception e) {
             LOG.error("Exception while finalizing message: ", e);
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();
+
+    boolean discarded = getDiscardedValue(reader);
+    assertFalse(discarded);
+    try {
+      Thread.sleep(closeTimeout + 1000);
+    } catch (InterruptedException ignored) {
+    }
+    discarded = getDiscardedValue(reader);
+    assertTrue(discarded);
+  }
+
+  @Test
+  public void testDiscardCheckpointMark() throws Exception {
+
+    Connection connection =
+        connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
+    for (int i = 0; i < 10; i++) {
+      producer.send(session.createTextMessage("test " + i));
+    }
+    producer.close();
+    session.close();
+    connection.close();
+
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE);
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: start already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // the messages are still pending in the queue (no ACK yet)
+    assertEquals(10, count(QUEUE));
+
+    // we finalize the checkpoint
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore
+    assertEquals(6, count(QUEUE));
+
+    // we read the 6 pending messages
+    for (int i = 0; i < 6; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // still 6 pending messages as we didn't finalize the checkpoint
+    assertEquals(6, count(QUEUE));
+
+    // But here we discard the checkpoint
+    ((JmsCheckpointMark) reader.getCheckpointMark()).discard();
+    // we finalize the checkpoint: no more message in the queue
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    assertEquals(6, count(QUEUE));
+  }
+
+  private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader)
+      throws NoSuchFieldException, IllegalAccessException {
+    JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark();
+    Field privateField = JmsCheckpointMark.class.getDeclaredField("discarded");
+    privateField.setAccessible(true);
+    boolean discarded = (boolean) privateField.get(checkpoint);
+    return discarded;

Review Comment:
   There is no guarantee that the discarded field will be readable by this thread since we don't acquire the lock which causes a memory barrier.
   
   This will work if you take my suggestion about making the fields package private:
   ```suggestion
       JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark();
       checkpoint.lock.readLock().lock();
       try {
       return checkpoint.discarded;
       } finally {
         checkpoint.lock.readLock().unlock();
       }
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -40,6 +40,7 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
 
   private Instant oldestMessageTimestamp = Instant.now();
   private transient List<Message> messages = new ArrayList<>();
+  private transient boolean discarded = false;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

Review Comment:
   It is common to mark certain fields/classes package private and mark it with `@VisibleForTesting` then you don't need to  use reflection to access the field.
   
   ```suggestion
     @VisibleForTesting
     transient boolean discarded = false;
   
     @VisibleForTesting
     final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -67,6 +68,15 @@ Instant getOldestMessageTimestamp() {
     }

Review Comment:
   In `add(Message message)`, add the check right after acquiring the lock:
   ```
   if (discarded) {
     throw new IllegalStateException(String.format("Attempting to add message %s to checkpoint that is discarded.", message));
   }
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();

Review Comment:
   I would suggest using a mock ScheduledExecutorService that you set on the PipelineOptions object when creating the reader. This way you can inject here in the test and capture the runnable/callable directly without needing to have a test reliant on Thread.sleep



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();
+
+    boolean discarded = getDiscardedValue(reader);
+    assertFalse(discarded);
+    try {
+      Thread.sleep(closeTimeout + 1000);
+    } catch (InterruptedException ignored) {
+    }
+    discarded = getDiscardedValue(reader);
+    assertTrue(discarded);
+  }
+
+  @Test
+  public void testDiscardCheckpointMark() throws Exception {
+
+    Connection connection =
+        connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
+    for (int i = 0; i < 10; i++) {
+      producer.send(session.createTextMessage("test " + i));
+    }
+    producer.close();
+    session.close();
+    connection.close();
+
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE);
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: start already consumed the first message)

Review Comment:
   ```suggestion
       // consume 3 more messages (NB: start already consumed the first message)
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {

Review Comment:
   /**
    * Sets the amount of time to wait for callbacks from the runner stating that the output has been durably persisted before closing the connection to the JMS broker. Any callbacks that do not occur will cause any unacknowledged messages to be returned to the JMS broker and redelivered to other clients.
    */



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();
+
+    boolean discarded = getDiscardedValue(reader);
+    assertFalse(discarded);
+    try {
+      Thread.sleep(closeTimeout + 1000);
+    } catch (InterruptedException ignored) {
+    }
+    discarded = getDiscardedValue(reader);
+    assertTrue(discarded);
+  }
+
+  @Test
+  public void testDiscardCheckpointMark() throws Exception {
+
+    Connection connection =
+        connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
+    for (int i = 0; i < 10; i++) {
+      producer.send(session.createTextMessage("test " + i));
+    }
+    producer.close();
+    session.close();
+    connection.close();
+
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE);
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    // start the reader and move to the first record
+    assertTrue(reader.start());
+
+    // consume 3 messages (NB: start already consumed the first message)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // the messages are still pending in the queue (no ACK yet)
+    assertEquals(10, count(QUEUE));
+
+    // we finalize the checkpoint
+    reader.getCheckpointMark().finalizeCheckpoint();
+
+    // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore
+    assertEquals(6, count(QUEUE));
+
+    // we read the 6 pending messages
+    for (int i = 0; i < 6; i++) {
+      assertTrue(reader.advance());
+    }
+
+    // still 6 pending messages as we didn't finalize the checkpoint
+    assertEquals(6, count(QUEUE));
+
+    // But here we discard the checkpoint
+    ((JmsCheckpointMark) reader.getCheckpointMark()).discard();
+    // we finalize the checkpoint: no more message in the queue

Review Comment:
   ```suggestion
       // we finalize the checkpoint: no messages should be acked
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r995489854


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+
       try {
-        if (consumer != null) {
-          consumer.close();
-          consumer = null;
+        closeAutoscaler();
+        closeConsumer();
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Yes ok, I noticed that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r984693480


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -123,11 +131,13 @@
 public class JmsIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
+  private static final long DEFAULT_CLOSE_TIMEOUT = 1000L;

Review Comment:
   Indeed no, the tests were good with 60s. We may be able to support a lower value, but not for sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r974264735


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
   static class UnboundedJmsReader<T> extends UnboundedReader<T> {
 
     private UnboundedJmsSource<T> source;
-    private JmsCheckpointMark checkpointMark;
     private Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private AutoScaler autoScaler;
 
     private T currentMessage;
+    private Message currentJmsMessage;
     private Instant currentTimestamp;
 
-    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
+    Set<Message> messagesToAck;

Review Comment:
   Keeping messages in memory seems like it could cause problems. Can we make this less memory intensive?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r997276356


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();

Review Comment:
   `pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService()`, the key part being the [as(...)](https://beam.apache.org/releases/javadoc/2.41.0/org/apache/beam/sdk/options/PipelineOptionsFactory.html#as-java.lang.Class-) method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1230471198

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r998509173


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,18 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    /**
+     * Sets the amount of time to wait for callbacks from the runner stating that the output has
+     * been durably persisted before closing the connection to the JMS broker. Any callbacks that do
+     * not occur will cause any unacknowledged messages to be returned to the JMS broker and
+     * redelivered to other clients.

Review Comment:
   ```suggestion
        * not occur will cause unacknowledged messages to be returned to the JMS broker and redelivered
        * to other clients.
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -92,7 +98,8 @@ public class JmsIOTest {
   private ConnectionFactory connectionFactory;
   private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;
 
-  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.fromOptions(createExecutorOptions());

Review Comment:
   We want to use the default for the existing tests:
   ```suggestion
     @Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions();
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +561,111 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout() throws IOException {
+

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +561,111 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout() throws IOException {
+
+    Duration closeTimeout = Duration.millis(2000L);
+    long waitTimeout = closeTimeout.getMillis() + 1000L;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+    ExecutorOptions options = createExecutorOptions();
+
+    JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+
+    reader.start();
+    reader.close();
+
+    boolean discarded = getDiscardedValue(reader);
+    assertFalse(discarded);

Review Comment:
   ```suggestion
       assertFalse(getDiscardedValue(reader));
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +561,111 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout() throws IOException {
+
+    Duration closeTimeout = Duration.millis(2000L);
+    long waitTimeout = closeTimeout.getMillis() + 1000L;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+    ExecutorOptions options = createExecutorOptions();
+
+    JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+
+    reader.start();
+    reader.close();
+
+    boolean discarded = getDiscardedValue(reader);
+    assertFalse(discarded);
+    try {
+      options.getScheduledExecutorService().awaitTermination(waitTimeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ignored) {
+    }
+    discarded = getDiscardedValue(reader);
+    assertTrue(discarded);

Review Comment:
   ```suggestion
       assertTrue(getDiscardedValue(reader));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1282965693

   Updated to use a mock, now waiting for results of test run before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1283084459

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r996025383


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +557,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    Duration closeTimeout = Duration.millis(2000L);
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();
+
+    boolean discarded = getDiscardedValue(reader);
+    assertFalse(discarded);
+    try {
+      Thread.sleep(closeTimeout.getMillis() + 1000);
+    } catch (InterruptedException ignored) {
+    }
+    discarded = getDiscardedValue(reader);
+    assertTrue(discarded);
+  }
+
+  @Test
+  public void testDiscardCheckpointMark() throws Exception {
+

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+
       try {
-        if (consumer != null) {
-          consumer.close();
-          consumer = null;
+        closeAutoscaler();
+        closeConsumer();
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   `pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService()`, the key part being the [as(...)](https://beam.apache.org/releases/javadoc/2.41.0/org/apache/beam/sdk/options/PipelineOptionsFactory.html#as-java.lang.Class-) method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1238073495

   Reminder, please take a look at this pr: @lukecwik @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r997820672


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+
+    int closeTimeout = 2000;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+    JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+    reader.start();
+    reader.close();

Review Comment:
   Yes I get it. I don't know exactly how to mock it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r957532345


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   `return System.identityHashCode(reader);`?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -38,33 +37,21 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
 
   private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class);
 
-  private Instant oldestMessageTimestamp = Instant.now();
-  private transient List<Message> messages = new ArrayList<>();
+  private transient JmsIO.UnboundedJmsReader<?> reader;
+  private transient List<Message> messagesToAck;
+  private final int readerHash;

Review Comment:
   Try not to add/or remove fields as this will impact pipeline update since the JmsCheckpoints are saved in the runner and the SerializableCoder that is used to encode these will complain about [local class incompatible](https://www.baeldung.com/java-serial-version-uid).



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;

Review Comment:
   Wouldn't you want to ensure it is the same reader instance?
   
   (It is unlikely but hashes can collide)



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
   static class UnboundedJmsReader<T> extends UnboundedReader<T> {
 
     private UnboundedJmsSource<T> source;
-    private JmsCheckpointMark checkpointMark;
     private Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private AutoScaler autoScaler;
 
     private T currentMessage;
+    private Message currentJmsMessage;
     private Instant currentTimestamp;
 
-    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
+    Set<Message> messagesToAck;

Review Comment:
   I would agree that this would continue to work as the previous solution did.
   
   Storing the message ids and acking them would be an improvement that could be considered in the future though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1250060623

   Reminder, please take a look at this pr: @apilloud @johnjcasey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1279612816

   https://github.com/apache/beam/pull/23234 was merged allowing you to use the ScheduledExecutorService from PipelineOptions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22932:
URL: https://github.com/apache/beam/pull/22932#issuecomment-1281235698

   Note that the next release cut is this Wednesday so if your able to clean-up this PR we could merge it and it would make its way into the 2.43 release otherwise it will be another 6 weeks before the next release cut.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r998671565


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -36,6 +37,9 @@
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.concurrent.Executors;

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +560,108 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout() throws IOException {
+    Duration closeTimeout = Duration.millis(2000L);
+    long waitTimeout = closeTimeout.getMillis() + 1000L;
+    JmsIO.Read spec =
+        JmsIO.read()
+            .withConnectionFactory(connectionFactory)
+            .withUsername(USERNAME)
+            .withPassword(PASSWORD)
+            .withQueue(QUEUE)
+            .withCloseTimeout(closeTimeout);
+
+    JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+    ExecutorOptions options = createExecutorOptions();
+
+    JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+
+    reader.start();
+    reader.close();
+
+    assertFalse(getDiscardedValue(reader));
+    try {
+      options.getScheduledExecutorService().awaitTermination(waitTimeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ignored) {
+    }
+    assertTrue(getDiscardedValue(reader));
+  }
+
+  private ExecutorOptions createExecutorOptions() {
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    ExecutorOptions options = PipelineOptionsFactory.create().as(ExecutorOptions.class);
+    options.setScheduledExecutorService(executorService);
+    return options;
+  }

Review Comment:
   ```suggestion
       ScheduledExecutorService mockScheduledExecutorService =
           Mockito.mock(ScheduledExecutorService.class);
       ExecutorOptions options = PipelineOptionsFactory.as(ExecutorOptions.class);
       options.setScheduledExecutorService(mockScheduledExecutorService);
       ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
       when(mockScheduledExecutorService.schedule(
               runnableArgumentCaptor.capture(), anyLong(), any(TimeUnit.class)))
           .thenReturn(null /* unused */);
   
       JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
        reader.start();
       assertFalse(getDiscardedValue(reader));
       reader.close();
       assertFalse(getDiscardedValue(reader));
       verify(mockScheduledExecutorService)
           .schedule(any(Runnable.class), eq(closeTimeout.getMillis()), eq(TimeUnit.MILLISECONDS));
       runnableArgumentCaptor.getValue().run();
       assertTrue(getDiscardedValue(reader));
       verifyNoMoreInteractions(mockScheduledExecutorService);
     }
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -67,6 +72,7 @@
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;

Review Comment:
   ```suggestion
   import org.junit.Rule;
   import org.mockito.ArgumentCaptor;
   import org.mockito.Mockito;
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +560,108 @@ public void testCustomAutoscaler() throws IOException {
     verify(autoScaler, times(1)).stop();
   }
 
+  @Test
+  public void testCloseWithTimeout() throws IOException {
+    Duration closeTimeout = Duration.millis(2000L);
+    long waitTimeout = closeTimeout.getMillis() + 1000L;

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -21,6 +21,7 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;

Review Comment:
   ```suggestion
   import static org.junit.Assert.fail;
   import static org.mockito.ArgumentMatchers.any;
   import static org.mockito.ArgumentMatchers.anyLong;
   import static org.mockito.ArgumentMatchers.eq;
   import static org.mockito.Mockito.verifyNoMoreInteractions;
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -92,7 +98,7 @@ public class JmsIOTest {
   private ConnectionFactory connectionFactory;
   private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;
 
-  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions();

Review Comment:
   ```suggestion
     @Rule public final transient TestPipeline pipeline = TestPipeline.create();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r984038907


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +58,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {

Review Comment:
   There is still a race here where reader.active.get() is true right now but becomes false while processing. I think you'll want to use a lock or synchronize on a common object.
   



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +608,85 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+      if (active.get()) {
+        try {
+          closeAutoscaler();
+          closeConsumer();
+          ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Once https://github.com/apache/beam/pull/23234/files gets merged then you can use the `ScheduledExecutorService` from `ExecutorOptions` instead of creating one.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -123,11 +131,13 @@
 public class JmsIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
+  private static final long DEFAULT_CLOSE_TIMEOUT = 1000L;

Review Comment:
   Is one second appropriate?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +58,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {

Review Comment:
   Instead of having access to internal reader details, consider exposing this package private method on the reader:
   ```
   public void ackMessages(List<Message> messages) {
   ...
   }
   ```
   
   This way the internal details of how the watermark advances and thread-safety can be private within the reader.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +381,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {

Review Comment:
   Do you want to go with a wait on how long we keep sessions around?
   ```suggestion
       /** The amount of time to wait before a message that was read has its acknowledgement expire and be returned back to the JMS broker. */
       public Read<T> withMessageAckExpiry(Duration ackExpiry) {
   ```
   
   If not then I would suggest updating this to:
   ```suggestion
       /** The amount of time to wait for bundle finalization callbacks allowing for messages to be acknowledged after the reader is closed. */
       public Read<T> withConnectionCloseTimeout(Duration closeTimeout) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rvballada commented on a diff in pull request #22932: Io jms fix ack message checkpoint

Posted by GitBox <gi...@apache.org>.
rvballada commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r984697468


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +381,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {

Review Comment:
   Yes we want to want to wait before closing the session, and then the connection. The closing of the session will indicate to the broker that all unacknowledged messages must be kept into the queue to be redelivered. 
   Don't know which is best, withSessionConnectionCloseTimeou, withSessionCloseTimeout....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org