You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2021/08/26 22:22:02 UTC
[beam] branch master updated: [BEAM-12742]
SamzaTimerInternalsFactory#deleteTimer(TimerData) does not properly delete
timers (#15389)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4c0493f [BEAM-12742] SamzaTimerInternalsFactory#deleteTimer(TimerData) does not properly delete timers (#15389)
4c0493f is described below
commit 4c0493f146249ee34dbd1233e0b69992b6e4af54
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Thu Aug 26 15:20:54 2021 -0700
[BEAM-12742] SamzaTimerInternalsFactory#deleteTimer(TimerData) does not properly delete timers (#15389)
---
.../apache/beam/runners/core/TimerInternals.java | 4 +-
.../beam/runners/samza/runtime/KeyedInternals.java | 8 ++-
.../samza/runtime/SamzaStoreStateInternals.java | 9 +--
.../samza/runtime/SamzaTimerInternalsFactory.java | 80 ++++++++++++----------
.../beam/runners/samza/state/SamzaMapState.java | 3 +
.../beam/runners/samza/state/SamzaSetState.java | 3 +
.../runners/portability/samza_runner_test.py | 4 --
7 files changed, 61 insertions(+), 50 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index c7f0274..965be82 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -79,11 +79,11 @@ public interface TimerInternals {
void deleteTimer(
StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain);
- /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
+ /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId);
- /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
+ /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
void deleteTimer(TimerData timerKey);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
index 9a8d852..6501247 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
@@ -28,6 +28,8 @@ import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.samza.state.SamzaMapState;
+import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.TimeDomain;
@@ -87,8 +89,10 @@ class KeyedInternals<K> {
final List<State> states = threadLocalKeyedStates.get().states;
states.forEach(
state -> {
- if (state instanceof SamzaStoreStateInternals.KeyValueIteratorState) {
- ((SamzaStoreStateInternals.KeyValueIteratorState) state).closeIterators();
+ if (state instanceof SamzaMapState) {
+ ((SamzaMapState) state).closeIterators();
+ } else if (state instanceof SamzaSetState) {
+ ((SamzaSetState) state).closeIterators();
}
});
states.clear();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 1861404..a9e38a7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -307,11 +307,6 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
}
- /** An internal State interface that holds underlying KeyValueIterators. */
- interface KeyValueIteratorState {
- void closeIterators();
- }
-
private abstract class AbstractSamzaState<T> {
private final StateNamespace namespace;
private final String addressId;
@@ -537,7 +532,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
}
- private class SamzaSetStateImpl<T> implements SamzaSetState<T>, KeyValueIteratorState {
+ private class SamzaSetStateImpl<T> implements SamzaSetState<T> {
private final SamzaMapStateImpl<T, Boolean> mapState;
private SamzaSetStateImpl(
@@ -630,7 +625,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
- implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
+ implements SamzaMapState<KeyT, ValueT> {
private final Coder<KeyT> keyCoder;
private final int storeKeySize;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index f791c3a..7814217 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -267,15 +267,13 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
}
if (lastTimestamp != null) {
- final TimerData lastTimerData =
- TimerData.of(
- timerData.getTimerId(),
- timerData.getTimerFamilyId(),
- timerData.getNamespace(),
- new Instant(lastTimestamp),
- new Instant(lastTimestamp),
- timerData.getDomain());
- deleteTimer(lastTimerData, false);
+ deleteTimer(
+ timerData.getNamespace(),
+ timerData.getTimerId(),
+ timerData.getTimerFamilyId(),
+ new Instant(lastTimestamp),
+ new Instant(lastTimestamp),
+ timerData.getDomain());
}
// persist it first
@@ -284,7 +282,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
// TO-DO: apply the same memory optimization over processing timers
switch (timerData.getDomain()) {
case EVENT_TIME:
- /**
+ /*
* To determine if the upcoming KeyedTimerData could be added to the Buffer while
* guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
* timestamp eviction priority:
@@ -319,36 +317,50 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
}
}
+ /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
+ @Override
+ @Deprecated
+ public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
+ deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
+ }
+
+ /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
+ @Override
+ @Deprecated
+ public void deleteTimer(TimerData timerData) {
+ deleteTimer(
+ timerData.getNamespace(),
+ timerData.getTimerId(),
+ timerData.getTimerFamilyId(),
+ timerData.getDomain());
+ }
+
@Override
public void deleteTimer(
StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
- TimerKey<K> timerKey = TimerKey.of(key, namespace, timerId, timerFamilyId);
-
- Long lastTimestamp = state.get(timerKey, timeDomain);
+ final TimerKey<K> timerKey = TimerKey.of(key, namespace, timerId, timerFamilyId);
+ final Long lastTimestamp = state.get(timerKey, timeDomain);
if (lastTimestamp == null) {
return;
}
- Instant timestamp = Instant.ofEpochMilli(lastTimestamp);
- deleteTimer(TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain));
+ final Instant timestamp = Instant.ofEpochMilli(lastTimestamp);
+ deleteTimer(namespace, timerId, timerFamilyId, timestamp, timestamp, timeDomain);
}
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
- deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
- }
-
- @Override
- public void deleteTimer(TimerData timerData) {
- deleteTimer(timerData, true);
- }
-
- private void deleteTimer(TimerData timerData, boolean updateState) {
+ private void deleteTimer(
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ final TimerData timerData =
+ TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain);
final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
- if (updateState) {
- state.deletePersisted(keyedTimerData);
- }
+
+ state.deletePersisted(keyedTimerData);
switch (timerData.getDomain()) {
case EVENT_TIME:
@@ -515,8 +527,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
maxEventTimeInBuffer = keyedTimerData.getTimerData().getTimestamp().getMillis();
}
- ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
- .closeIterators();
+ timestampSortedEventTimeTimerState.closeIterators();
LOG.info("Loaded {} event time timers in memory", eventTimeBuffer.size());
if (eventTimeBuffer.size() < maxEventTimerBufferSize) {
@@ -544,7 +555,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
++count;
}
- ((SamzaStoreStateInternals.KeyValueIteratorState) processingTimeTimerState).closeIterators();
+ processingTimeTimerState.closeIterators();
LOG.info("Loaded {} processing time timers in memory", count);
}
@@ -573,10 +584,9 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
timestampSortedEventTimeTimerState.add(keyedTimerData);
}
}
- ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
- .closeIterators();
+ timestampSortedEventTimeTimerState.closeIterators();
}
- ((SamzaStoreStateInternals.KeyValueIteratorState) eventTimeTimerState).closeIterators();
+ eventTimeTimerState.closeIterators();
reloadEventTimeTimers();
loadProcessingTimeTimers();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java
index 2afba06..a8741fb 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaMapState.java
@@ -34,4 +34,7 @@ public interface SamzaMapState<KeyT, ValueT> extends MapState<KeyT, ValueT> {
* @return a {@link ReadableState} of an iterator
*/
ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator();
+
+ /** Closes the iterator returned from {@link SamzaMapState#readIterator()}. */
+ void closeIterators();
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java
index a6785c7..8af82fc 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/state/SamzaSetState.java
@@ -33,4 +33,7 @@ public interface SamzaSetState<T> extends SetState<T> {
* @return a {@link ReadableState} of an iterator
*/
ReadableState<Iterator<T>> readIterator();
+
+ /** Closes the iterator returned from {@link SamzaSetState#readIterator()}. */
+ void closeIterators();
}
diff --git a/sdks/python/apache_beam/runners/portability/samza_runner_test.py b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
index ea88c12..2f60ad8 100644
--- a/sdks/python/apache_beam/runners/portability/samza_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/samza_runner_test.py
@@ -153,10 +153,6 @@ class SamzaRunnerTest(portable_runner_test.PortableRunnerTest):
# Skip until Samza portable runner supports clearing timer.
raise unittest.SkipTest("BEAM-12774")
- def test_pardo_timers_clear(self):
- # Skip until Samza portable runner supports clearing timer.
- raise unittest.SkipTest("BEAM-12774")
-
def test_register_finalizations(self):
# Skip until Samza runner supports bundle finalization.
raise unittest.SkipTest("BEAM-12615")