You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/31 16:35:29 UTC

[GitHub] [beam] dxichen opened a new pull request, #22976: Buffered process time timers

dxichen opened a new pull request, #22976:
URL: https://github.com/apache/beam/pull/22976

   Addresses #22975
   
   That the added code includes a new KeyTimerData format to be saved in order to facilitate migration to this new buffered format (fully backwards compatible as it still maintains the previous state format for process timers)
   
   Furthermore, with this change PROCESS_TIME timers no longer directly uses the TimerRegistry, rather it passes the ready timers on watermark similar to the EVENT_TIME timers
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r966279006


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -191,11 +193,26 @@ public Collection<KeyedTimerData<K>> removeReadyTimers() {
         state.reloadEventTimeTimers();
       }
     }
+
+    // Flush all timers for process time
+    final Iterator<KeyedTimerData<K>> processBufferIterator = processTimeBuffer.iterator();
+    while (processBufferIterator.hasNext() && readyTimers.size() < maxReadyTimersToProcessOnce) {
+      KeyedTimerData<K> processTimerData = processBufferIterator.next();
+      readyTimers.add(processTimerData);
+      processBufferIterator.remove();
+      state.deletePersisted(processTimerData);

Review Comment:
   due to samza checkpointing with changelog/blobstore, and state on restart from failure is read from the checkpoint any persisted deletes will be rolled back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
mynameborat commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959805133


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -545,57 +596,58 @@ private void reloadEventTimeTimers() {
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, keyCoder);
-
-        timerRegistry.schedule(
-            keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
+
+      while (iter.hasNext() && processTimeBuffer.size() < maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
       }
-      processingTimeTimerState.closeIterators();
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", processTimeBuffer.size());
     }
 
-    /**
-     * Restore timer state from RocksDB. This is needed for migration of existing jobs. Give events
-     * in eventTimeTimerState, construct timestampSortedEventTimeTimerState preparing for memory
-     * reloading. TO-DO: processing time timers are still loaded into memory in one shot; will apply
-     * the same optimization mechanism as event time timer
-     */
+    /** Restore timer state from RocksDB. */
     private void init() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> eventTimersIter =
-          eventTimeTimerState.readIterator().read();
-      // use hasNext to check empty, because this is relatively cheap compared with Iterators.size()
-      if (eventTimersIter.hasNext()) {
-        final Iterator sortedEventTimerIter =
-            timestampSortedEventTimeTimerState.readIterator().read();
-
-        if (!sortedEventTimerIter.hasNext()) {
-          // inline the migration code
-          while (eventTimersIter.hasNext()) {
-            final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
-            final KeyedTimerData<K> keyedTimerData =
-                TimerKey.toKeyedTimerData(
-                    entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, keyCoder);
-            timestampSortedEventTimeTimerState.add(keyedTimerData);
-          }
-        }
-        timestampSortedEventTimeTimerState.closeIterators();
-      }
-      eventTimeTimerState.closeIterators();
+      migrateToKeyedTimerState(
+          eventTimeTimerState, timestampSortedEventTimeTimerState, TimeDomain.EVENT_TIME);
+      migrateToKeyedTimerState(
+          processingTimeTimerState,
+          timestampSortedProcessTimeTimerState,
+          TimeDomain.PROCESSING_TIME);
 
       reloadEventTimeTimers();
-      loadProcessingTimeTimers();
+      reloadProcessingTimeTimers();
+    }
+  }
+
+  /**
+   * This is needed for migration of existing jobs. Give events in timerState, construct
+   * keyedTimerState preparing for memory reloading.
+   */
+  private void migrateToKeyedTimerState(
+      SamzaMapState<TimerKey<K>, Long> timerState,
+      SamzaSetState<KeyedTimerData<K>> keyedTimerState,
+      TimeDomain timeDomain) {
+    final Iterator<Map.Entry<TimerKey<K>, Long>> timersIter = timerState.readIterator().read();
+    // use hasNext to check empty, because this is relatively cheap compared with Iterators.size()
+    if (timersIter.hasNext()) {
+      final Iterator keyedTimerIter = keyedTimerState.readIterator().read();
+
+      if (!keyedTimerIter.hasNext()) {
+        // Migrate from timerState to keyedTimerState
+        while (timersIter.hasNext()) {
+          final Map.Entry<TimerKey<K>, Long> entry = timersIter.next();
+          final KeyedTimerData<K> keyedTimerData =
+              TimerKey.toKeyedTimerData(entry.getKey(), entry.getValue(), timeDomain, keyCoder);
+          keyedTimerState.add(keyedTimerData);
+        }
+      }
+      keyedTimerState.closeIterators();

Review Comment:
   Will this code go away after migration? If so, can we add a TODO and attach a JIRA ticket? 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -545,57 +596,58 @@ private void reloadEventTimeTimers() {
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, keyCoder);
-
-        timerRegistry.schedule(
-            keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
+
+      while (iter.hasNext() && processTimeBuffer.size() < maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
       }
-      processingTimeTimerState.closeIterators();
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", processTimeBuffer.size());

Review Comment:
   Need to close the iterator since the loop can exit early



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -218,6 +235,10 @@ public Instant getOutputWatermark() {
   // for unit test only
   NavigableSet<KeyedTimerData<K>> getEventTimeBuffer() {
     return eventTimeBuffer;
+  } // todo dchen1

Review Comment:
   remove `//todo dchen1`



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -444,6 +470,16 @@ private class SamzaTimerState {
                           new TimerKeyCoder<>(keyCoder, windowCoder),
                           VarLongCoder.of()));
 
+      this.timestampSortedProcessTimeTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-pts",

Review Comment:
   looks like we anyways don't have consistent naming scheme and can't change `-ts` to `-ets`. maybe be more descriptive instead of `pts` to help readability and debuggability.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -307,7 +327,11 @@ public void setTimer(TimerData timerData) {
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          // Append to buffer iff not full
+          if (processTimeBuffer.size() < maxProcessTimerBufferSize) {
+            processTimeBuffer.add(keyedTimerData);
+          }

Review Comment:
   Do we need to do anything in case we have reached the buffer size? just like the event time scenario.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,18 +302,15 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries

Review Comment:
   Why is this deleted? Does it no longer apply for event time is it? 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -191,11 +193,26 @@ public Collection<KeyedTimerData<K>> removeReadyTimers() {
         state.reloadEventTimeTimers();
       }
     }
+
+    // Flush all timers for process time
+    final Iterator<KeyedTimerData<K>> processBufferIterator = processTimeBuffer.iterator();
+    while (processBufferIterator.hasNext() && readyTimers.size() < maxReadyTimersToProcessOnce) {
+      KeyedTimerData<K> processTimerData = processBufferIterator.next();
+      readyTimers.add(processTimerData);
+      processBufferIterator.remove();
+      state.deletePersisted(processTimerData);

Review Comment:
   Given we persist the deletes for the process timers, are the ready timers persisted themselves in case of failures? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1367900436

   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959837551


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -444,6 +470,16 @@ private class SamzaTimerState {
                           new TimerKeyCoder<>(keyCoder, windowCoder),
                           VarLongCoder.of()));
 
+      this.timestampSortedProcessTimeTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-pts",

Review Comment:
   good point will name it to process-timers-sorted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1242520385

   Run Samza ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] closed pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #22976: Buffered process time timers
URL: https://github.com/apache/beam/pull/22976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kw2542 commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
kw2542 commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r966260499


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java:
##########
@@ -80,7 +80,13 @@ public interface SamzaPipelineOptions extends PipelineOptions {
   @Default.Integer(50000)
   int getEventTimerBufferSize();
 
-  void setEventTimerBufferSize(int eventTimerBufferSize);
+  void setEventTimerBufferSize(int processingTimerBufferSize);

Review Comment:
   argument should be named `eventTimerBufferSize`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1233221704

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kw2542 commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
kw2542 commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1244102358

   Run Java Samza PortableValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1233169565

   R: @xinyuiscool 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1240997659

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r982688831


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {

Review Comment:
   eventTimeBuffer.last() will throw exception if the set is empty, right?
   
   The updated code is far from readable compared to the previous condition. I have no idea what's going on in such a complex nested if statement.  @dxichen : please think about how others can understand this piece of code and maintain it in the long turn. The changes need to be very readable and easy to debug. Otherwise it'll increase maintenance overhead.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;

Review Comment:
   Hmm, I think we persist the timers in state regardless we put it in the buffer. Why we need this flag here? can we just learn this by checking whether the state is empty? The more flags in the code, the messier logic it will be.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && !processTimersInState)

Review Comment:
   This condition is not readable. Rewrite.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -539,63 +599,76 @@ private void reloadEventTimeTimers() {
         LOG.debug(
             "Event time timers in State is empty, filled {} timers out of {} buffer capacity",
             eventTimeBuffer.size(),
-            maxEventTimeInBuffer);
+            maxEventTimerBufferSize);
         // Reset the flag variable to indicate there are no more KeyedTimerData in State
-        maxEventTimeInBuffer = Long.MAX_VALUE;
+        eventsTimersInState = false;

Review Comment:
   Seems there is some misunderstanding in the logic here. We are not removing timers from the state, so they are still there.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;

Review Comment:
   The timers are always in state. The flag name is just wrong.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -539,63 +599,76 @@ private void reloadEventTimeTimers() {
         LOG.debug(
             "Event time timers in State is empty, filled {} timers out of {} buffer capacity",
             eventTimeBuffer.size(),
-            maxEventTimeInBuffer);
+            maxEventTimerBufferSize);
         // Reset the flag variable to indicate there are no more KeyedTimerData in State
-        maxEventTimeInBuffer = Long.MAX_VALUE;
+        eventsTimersInState = false;
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, keyCoder);
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
 
+      while (iter.hasNext() && processTimeBuffer.size() < maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
         timerRegistry.schedule(
             keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
       }
-      processingTimeTimerState.closeIterators();
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", processTimeBuffer.size());
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      if (processTimeBuffer.size() < maxProcessTimerBufferSize) {
+        LOG.debug(
+            "Process time timers in State is empty, filled {} timers out of {} buffer capacity",
+            processTimeBuffer.size(),
+            maxProcessTimerBufferSize);
+        // Reset the flag variable to indicate there are no more KeyedTimerData in State
+        processTimersInState = false;

Review Comment:
   same comment above. I don't like adding these random flags either.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && !processTimersInState)
+              || newTimestamp
+                  < processTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
+            processTimeBuffer.add(keyedTimerData);
+            timerRegistry.schedule(keyedTimerData, newTimestamp);
+            if (processTimeBuffer.size() > maxProcessTimerBufferSize) {
+              KeyedTimerData oldKeyedTimerData = processTimeBuffer.pollLast();
+              timerRegistry.delete(oldKeyedTimerData);
+            }
+          } else {
+            processTimersInState = true;

Review Comment:
   I think this is useless.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && !processTimersInState)
+              || newTimestamp
+                  < processTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
+            processTimeBuffer.add(keyedTimerData);
+            timerRegistry.schedule(keyedTimerData, newTimestamp);
+            if (processTimeBuffer.size() > maxProcessTimerBufferSize) {
+              KeyedTimerData oldKeyedTimerData = processTimeBuffer.pollLast();
+              timerRegistry.delete(oldKeyedTimerData);
+            }
+          } else {
+            processTimersInState = true;

Review Comment:
   These flags are just recipe for disaster. Plus we are preserving the timers in state no matter what.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && !eventsTimersInState)
+              || newTimestamp < eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && !processTimersInState)

Review Comment:
   Again, a large chunk of nested if-else which is probably only understandable to you. Rewrite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kw2542 commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
kw2542 commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r968795390


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -193,7 +201,7 @@ public Collection<KeyedTimerData<K>> removeReadyTimers() {
     }
     LOG.debug("Removed {} ready timers", readyTimers.size());
 
-    if (readyTimers.size() == maxReadyTimersToProcessOnce
+    if (readyTimers.size() >= maxReadyTimersToProcessOnce

Review Comment:
   This change may not be necessary since readyTimers size is incremented by one per loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959834109


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -545,57 +596,58 @@ private void reloadEventTimeTimers() {
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, keyCoder);
-
-        timerRegistry.schedule(
-            keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
+
+      while (iter.hasNext() && processTimeBuffer.size() < maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
       }
-      processingTimeTimerState.closeIterators();
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", processTimeBuffer.size());
     }
 
-    /**
-     * Restore timer state from RocksDB. This is needed for migration of existing jobs. Give events
-     * in eventTimeTimerState, construct timestampSortedEventTimeTimerState preparing for memory
-     * reloading. TO-DO: processing time timers are still loaded into memory in one shot; will apply
-     * the same optimization mechanism as event time timer
-     */
+    /** Restore timer state from RocksDB. */
     private void init() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> eventTimersIter =
-          eventTimeTimerState.readIterator().read();
-      // use hasNext to check empty, because this is relatively cheap compared with Iterators.size()
-      if (eventTimersIter.hasNext()) {
-        final Iterator sortedEventTimerIter =
-            timestampSortedEventTimeTimerState.readIterator().read();
-
-        if (!sortedEventTimerIter.hasNext()) {
-          // inline the migration code
-          while (eventTimersIter.hasNext()) {
-            final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
-            final KeyedTimerData<K> keyedTimerData =
-                TimerKey.toKeyedTimerData(
-                    entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, keyCoder);
-            timestampSortedEventTimeTimerState.add(keyedTimerData);
-          }
-        }
-        timestampSortedEventTimeTimerState.closeIterators();
-      }
-      eventTimeTimerState.closeIterators();
+      migrateToKeyedTimerState(
+          eventTimeTimerState, timestampSortedEventTimeTimerState, TimeDomain.EVENT_TIME);
+      migrateToKeyedTimerState(
+          processingTimeTimerState,
+          timestampSortedProcessTimeTimerState,
+          TimeDomain.PROCESSING_TIME);
 
       reloadEventTimeTimers();
-      loadProcessingTimeTimers();
+      reloadProcessingTimeTimers();
+    }
+  }
+
+  /**
+   * This is needed for migration of existing jobs. Give events in timerState, construct
+   * keyedTimerState preparing for memory reloading.
+   */
+  private void migrateToKeyedTimerState(
+      SamzaMapState<TimerKey<K>, Long> timerState,
+      SamzaSetState<KeyedTimerData<K>> keyedTimerState,
+      TimeDomain timeDomain) {
+    final Iterator<Map.Entry<TimerKey<K>, Long>> timersIter = timerState.readIterator().read();
+    // use hasNext to check empty, because this is relatively cheap compared with Iterators.size()
+    if (timersIter.hasNext()) {
+      final Iterator keyedTimerIter = keyedTimerState.readIterator().read();
+
+      if (!keyedTimerIter.hasNext()) {
+        // Migrate from timerState to keyedTimerState
+        while (timersIter.hasNext()) {
+          final Map.Entry<TimerKey<K>, Long> entry = timersIter.next();
+          final KeyedTimerData<K> keyedTimerData =
+              TimerKey.toKeyedTimerData(entry.getKey(), entry.getValue(), timeDomain, keyCoder);
+          keyedTimerState.add(keyedTimerData);
+        }
+      }
+      keyedTimerState.closeIterators();

Review Comment:
   This could be removed after all the samza-beam jobs have migrated to use only keyed timer data, will add a todo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959833468


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -307,7 +327,11 @@ public void setTimer(TimerData timerData) {
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          // Append to buffer iff not full
+          if (processTimeBuffer.size() < maxProcessTimerBufferSize) {
+            processTimeBuffer.add(keyedTimerData);
+          }

Review Comment:
   No we don't need to replace any events in the buffer like in event time because process time is fifo only, if the buffer is full we don't do anything and it is simply stored in the state 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kw2542 commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
kw2542 commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r968795390


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -193,7 +201,7 @@ public Collection<KeyedTimerData<K>> removeReadyTimers() {
     }
     LOG.debug("Removed {} ready timers", readyTimers.size());
 
-    if (readyTimers.size() == maxReadyTimersToProcessOnce
+    if (readyTimers.size() >= maxReadyTimersToProcessOnce

Review Comment:
   This change may not be necessary since readyTimers size is incremented by one per loop, but should be safe to update to `>=`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959836682


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,18 +302,15 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries

Review Comment:
   This no longer represent the below code completely changed (even before my change, removed 1) and kept 2)



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -444,6 +470,16 @@ private class SamzaTimerState {
                           new TimerKeyCoder<>(keyCoder, windowCoder),
                           VarLongCoder.of()));
 
+      this.timestampSortedProcessTimeTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-pts",

Review Comment:
   good point will name it to process-timestamp-sorted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22976: Buffered process time timers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22976:
URL: https://github.com/apache/beam/pull/22976#issuecomment-1339272635

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org