You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2021/08/26 22:22:02 UTC

[beam] branch master updated: [BEAM-12742] SamzaTimerInternalsFactory#deleteTimer(TimerData) does not properly delete timers (#15389)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c0493f  [BEAM-12742] SamzaTimerInternalsFactory#deleteTimer(TimerData) does not properly delete timers (#15389)
4c0493f is described below

commit 4c0493f146249ee34dbd1233e0b69992b6e4af54
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Thu Aug 26 15:20:54 2021 -0700

    [BEAM-12742] SamzaTimerInternalsFactory#deleteTimer(TimerData) does not properly delete timers (#15389)
---
 .../apache/beam/runners/core/TimerInternals.java   |  4 +-
 .../beam/runners/samza/runtime/KeyedInternals.java |  8 ++-
 .../samza/runtime/SamzaStoreStateInternals.java    |  9 +--
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 80 ++++++++++++----------
 .../beam/runners/samza/state/SamzaMapState.java    |  3 +
 .../beam/runners/samza/state/SamzaSetState.java    |  3 +
 .../runners/portability/samza_runner_test.py       |  4 --
 7 files changed, 61 insertions(+), 50 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index c7f0274..965be82 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -79,11 +79,11 @@ public interface TimerInternals {
   void deleteTimer(
       StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain);
 
-  /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
+  /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
   @Deprecated
   void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId);
 
-  /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
+  /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
   @Deprecated
   void deleteTimer(TimerData timerKey);
 
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
index 9a8d852..6501247 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
@@ -28,6 +28,8 @@ import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.samza.state.SamzaMapState;
+import org.apache.beam.runners.samza.state.SamzaSetState;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateContext;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -87,8 +89,10 @@ class KeyedInternals<K> {
     final List<State> states = threadLocalKeyedStates.get().states;
     states.forEach(
         state -> {
-          if (state instanceof SamzaStoreStateInternals.KeyValueIteratorState) {
-            ((SamzaStoreStateInternals.KeyValueIteratorState) state).closeIterators();
+          if (state instanceof SamzaMapState) {
+            ((SamzaMapState) state).closeIterators();
+          } else if (state instanceof SamzaSetState) {
+            ((SamzaSetState) state).closeIterators();
           }
         });
     states.clear();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 1861404..a9e38a7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -307,11 +307,6 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     }
   }
 
-  /** An internal State interface that holds underlying KeyValueIterators. */
-  interface KeyValueIteratorState {
-    void closeIterators();
-  }
-
   private abstract class AbstractSamzaState<T> {
     private final StateNamespace namespace;
     private final String addressId;
@@ -537,7 +532,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     }
   }
 
-  private class SamzaSetStateImpl<T> implements SamzaSetState<T>, KeyValueIteratorState {
+  private class SamzaSetStateImpl<T> implements SamzaSetState<T> {
     private final SamzaMapStateImpl<T, Boolean> mapState;
 
     private SamzaSetStateImpl(
@@ -630,7 +625,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
   }
 
   private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
-      implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
+      implements SamzaMapState<KeyT, ValueT> {
 
     private final Coder<KeyT> keyCoder;
     private final int storeKeySize;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index f791c3a..7814217 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -267,15 +267,13 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       }
 
       if (lastTimestamp != null) {
-        final TimerData lastTimerData =
-            TimerData.of(
-                timerData.getTimerId(),
-                timerData.getTimerFamilyId(),
-                timerData.getNamespace(),
-                new Instant(lastTimestamp),
-                new Instant(lastTimestamp),
-                timerData.getDomain());
-        deleteTimer(lastTimerData, false);
+        deleteTimer(
+            timerData.getNamespace(),
+            timerData.getTimerId(),
+            timerData.getTimerFamilyId(),
+            new Instant(lastTimestamp),
+            new Instant(lastTimestamp),
+            timerData.getDomain());
       }
 
       // persist it first
@@ -284,7 +282,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       // TO-DO: apply the same memory optimization over processing timers
       switch (timerData.getDomain()) {
         case EVENT_TIME:
-          /**
+          /*
            * To determine if the upcoming KeyedTimerData could be added to the Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
            * timestamp eviction priority:
@@ -319,36 +317,50 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       }
     }
 
+    /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
+    @Override
+    @Deprecated
+    public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
+      deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
+    }
+
+    /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
+    @Override
+    @Deprecated
+    public void deleteTimer(TimerData timerData) {
+      deleteTimer(
+          timerData.getNamespace(),
+          timerData.getTimerId(),
+          timerData.getTimerFamilyId(),
+          timerData.getDomain());
+    }
+
     @Override
     public void deleteTimer(
         StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
-      TimerKey<K> timerKey = TimerKey.of(key, namespace, timerId, timerFamilyId);
-
-      Long lastTimestamp = state.get(timerKey, timeDomain);
+      final TimerKey<K> timerKey = TimerKey.of(key, namespace, timerId, timerFamilyId);
+      final Long lastTimestamp = state.get(timerKey, timeDomain);
 
       if (lastTimestamp == null) {
         return;
       }
 
-      Instant timestamp = Instant.ofEpochMilli(lastTimestamp);
-      deleteTimer(TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain));
+      final Instant timestamp = Instant.ofEpochMilli(lastTimestamp);
+      deleteTimer(namespace, timerId, timerFamilyId, timestamp, timestamp, timeDomain);
     }
 
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
-      deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public void deleteTimer(TimerData timerData) {
-      deleteTimer(timerData, true);
-    }
-
-    private void deleteTimer(TimerData timerData, boolean updateState) {
+    private void deleteTimer(
+        StateNamespace namespace,
+        String timerId,
+        String timerFamilyId,
+        Instant timestamp,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
+      final TimerData timerData =
+          TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
       final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
-      if (updateState) {
-        state.deletePersisted(keyedTimerData);
-      }
+
+      state.deletePersisted(keyedTimerData);
 
       switch (timerData.getDomain()) {
         case EVENT_TIME:
@@ -515,8 +527,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
         maxEventTimeInBuffer = keyedTimerData.getTimerData().getTimestamp().getMillis();
       }
 
-      ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
-          .closeIterators();
+      timestampSortedEventTimeTimerState.closeIterators();
       LOG.info("Loaded {} event time timers in memory", eventTimeBuffer.size());
 
       if (eventTimeBuffer.size() < maxEventTimerBufferSize) {
@@ -544,7 +555,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
             keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
         ++count;
       }
-      ((SamzaStoreStateInternals.KeyValueIteratorState) processingTimeTimerState).closeIterators();
+      processingTimeTimerState.closeIterators();
 
       LOG.info("Loaded {} processing time timers in memory", count);
     }
@@ -573,10 +584,9 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
             timestampSortedEventTimeTimerState.add(keyedTimerData);
           }
         }
-        ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
-            .closeIterators();
+        timestampSortedEventTimeTimerState.closeIterators();
       }
-      ((SamzaStoreStateInternals.KeyValueIteratorState) eventTimeTimerState).closeIterators();
+      eventTimeTimerState.closeIterators();
 
       reloadEventTimeTimers();
       loadProcessingTimeTimers();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java
index 2afba06..a8741fb 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java
@@ -34,4 +34,7 @@ public interface SamzaMapState<KeyT, ValueT> extends MapState<KeyT, ValueT> {
    * @return a {@link ReadableState} of an iterator
    */
   ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator();
+
+  /** Closes the iterator returned from {@link SamzaMapState#readIterator()}. */
+  void closeIterators();
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java
index a6785c7..8af82fc 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java
@@ -33,4 +33,7 @@ public interface SamzaSetState<T> extends SetState<T> {
    * @return a {@link ReadableState} of an iterator
    */
   ReadableState<Iterator<T>> readIterator();
+
+  /** Closes the iterator returned from {@link SamzaSetState#readIterator()}. */
+  void closeIterators();
 }
diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
index ea88c12..2f60ad8 100644
--- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
@@ -153,10 +153,6 @@ class SamzaRunnerTest(portable_runner_test.PortableRunnerTest):
     # Skip until Samza portable runner supports clearing timer.
     raise unittest.SkipTest("BEAM-12774")
 
-  def test_pardo_timers_clear(self):
-    # Skip until Samza portable runner supports clearing timer.
-    raise unittest.SkipTest("BEAM-12774")
-
   def test_register_finalizations(self):
     # Skip until Samza runner supports bundle finalization.
     raise unittest.SkipTest("BEAM-12615")