You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/26 16:01:41 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #22932: Io jms fix ack message checkpoint

lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r957532345


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   `return System.identityHashCode(reader);`?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -38,33 +37,21 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
 
   private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class);
 
-  private Instant oldestMessageTimestamp = Instant.now();
-  private transient List<Message> messages = new ArrayList<>();
+  private transient JmsIO.UnboundedJmsReader<?> reader;
+  private transient List<Message> messagesToAck;
+  private final int readerHash;

Review Comment:
   Try not to add/or remove fields as this will impact pipeline update since the JmsCheckpoints are saved in the runner and the SerializableCoder that is used to encode these will complain about [local class incompatible](https://www.baeldung.com/java-serial-version-uid).



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;

Review Comment:
   Wouldn't you want to ensure it is the same reader instance?
   
   (It is unlikely but hashes can collide)



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
   static class UnboundedJmsReader<T> extends UnboundedReader<T> {
 
     private UnboundedJmsSource<T> source;
-    private JmsCheckpointMark checkpointMark;
     private Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private AutoScaler autoScaler;
 
     private T currentMessage;
+    private Message currentJmsMessage;
     private Instant currentTimestamp;
 
-    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
+    Set<Message> messagesToAck;

Review Comment:
   I would agree that this would continue to work as the previous solution did.
   
   Storing the message ids and acking them would be an improvement that could be considered in the future though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org