You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2020/08/25 17:50:02 UTC
[flink-training] branch master updated: [FLINK-18630] Redo the
LongRideAlerts solutions to cover a corner case (#13)
This is an automated email from the ASF dual-hosted git repository.
danderson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git
The following commit(s) were added to refs/heads/master by this push:
new ea4a66e [FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case (#13)
ea4a66e is described below
commit ea4a66e97dd211bd8f8b8e415e3e427c30e4746b
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Tue Aug 25 19:49:55 2020 +0200
[FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case (#13)
---
long-ride-alerts/DISCUSSION.md | 16 +++++++++
long-ride-alerts/README.md | 18 ++++++++--
.../solutions/longrides/LongRidesSolution.java | 41 ++++++++++++----------
.../longrides/scala/LongRidesSolution.scala | 40 +++++++++++----------
.../exercises/longrides/LongRidesTest.java | 10 ++++++
5 files changed, 86 insertions(+), 39 deletions(-)
diff --git a/long-ride-alerts/DISCUSSION.md b/long-ride-alerts/DISCUSSION.md
index bf54c4f..f931ffa 100644
--- a/long-ride-alerts/DISCUSSION.md
+++ b/long-ride-alerts/DISCUSSION.md
@@ -21,7 +21,23 @@ under the License.
(Discussion of [Lab: `ProcessFunction` and Timers (Long Ride Alerts)](./))
+It would be interesting to test that the solution does not leak state.
+A good way to write unit tests for a `KeyedProcessFunction` to check for state retention, etc., is to
+use the test harnesses described in the
+[documentation on testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators).
+
+In fact, the reference solutions will leak state in the case where a START event is missing. They also
+leak in the case where the alert is generated, but then the END event does eventually arrive (after `onTimer()`
+has cleared the matching START event).
+
+This could be addressed either by using [state TTL](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl),
+or by using another timer that eventually
+clears any remaining state. There is a tradeoff here, however: once that state has been removed,
+then if the matching events are not actually missing, but are instead very, very late, they will cause erroneous alerts.
+
+This tradeoff between keeping state indefinitely versus occasionally getting things wrong when events are
+exceptionally late is a challenge that is inherent to stateful stream processing.
-----
diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md
index f6514a5..a5d2ac7 100644
--- a/long-ride-alerts/README.md
+++ b/long-ride-alerts/README.md
@@ -62,13 +62,25 @@ The resulting stream should be printed to standard out.
<details>
<summary><strong>Overall approach</strong></summary>
-This exercise revolves around using a `ProcessFunction` to manage some keyed state and event time timers, and doing so in a way that works even when the END event for a given `rideId` arrives before the START (which will happen). The challenge is figuring out what state to keep, and when to set and clear that state.
+This exercise revolves around using a `ProcessFunction` to manage some keyed state and event time timers,
+and doing so in a way that works even when the END event for a given `rideId` arrives before the START (which can happen).
+The challenge is figuring out what state to keep, and when to set and clear that state.
+You will want to use event time timers that fire two hours after an incoming START event, and in the `onTimer()` method,
+collect START events to the output only if a matching END event hasn't yet arrived.
</details>
<details>
-<summary><strong>Timers and State</strong></summary>
+<summary><strong>State and timers</strong></summary>
-You will want to use event time timers that fire two hours after the incoming events, and in the `onTimer()` method, collect START events to the output only if a matching END event hasn't yet arrived. As for what state to keep, it is enough to remember the "last" event for each `rideId`, where "last" is based on event time and ride type (START vs END — yes, there are rides where the START and END have the same timestamp), rather than the order in which the events are processed. The [...]
+There are many possible solutions for this exercise, but in general it is enough to keep one
+`TaxiRide` in state (one `TaxiRide` for each key, or `rideId`). The approach used in the reference solution is to
+store whichever event arrives first (the START or the END), and if it's a START event,
+create a timer for two hours later. If and when the other event (for the same `rideId`) arrives,
+carefully clean things up.
+
+It is possible to arrange this so that if `onTimer()` is called, you are guaranteed that
+an alert (i.e., the ride kept in state) should be emitted. Writing the code this way conveniently
+puts all of the complex business logic together in one place (in the `processElement()` method).
</details>
## Documentation
diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index dc6ac2e..426e2db 100644
--- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -65,41 +64,47 @@ public class LongRidesSolution extends ExerciseBase {
}
private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {
- // keyed, managed state
- // holds an END event if the ride has ended, otherwise a START event
+
private ValueState<TaxiRide> rideState;
@Override
public void open(Configuration config) {
- ValueStateDescriptor<TaxiRide> startDescriptor =
- new ValueStateDescriptor<>("saved ride", TaxiRide.class);
- rideState = getRuntimeContext().getState(startDescriptor);
+ ValueStateDescriptor<TaxiRide> stateDescriptor =
+ new ValueStateDescriptor<>("ride event", TaxiRide.class);
+ rideState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
- TimerService timerService = context.timerService();
+ TaxiRide previousRideEvent = rideState.value();
- if (ride.isStart) {
- // the matching END might have arrived first; don't overwrite it
- if (rideState.value() == null) {
- rideState.update(ride);
+ if (previousRideEvent == null) {
+ rideState.update(ride);
+ if (ride.isStart) {
+ context.timerService().registerEventTimeTimer(getTimerTime(ride));
}
} else {
- rideState.update(ride);
+ if (!ride.isStart) {
+ // it's an END event, so event saved was the START event and has a timer
+ // the timer hasn't fired yet, and we can safely kill the timer
+ context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
+ }
+ // both events have now been seen, we can clear the state
+ rideState.clear();
}
-
- timerService.registerEventTimeTimer(ride.getEventTime() + 120 * 60 * 1000);
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
- TaxiRide savedRide = rideState.value();
- if (savedRide != null && savedRide.isStart) {
- out.collect(savedRide);
- }
+
+ // if we get here, we know that the ride started two hours ago, and the END hasn't been processed
+ out.collect(rideState.value());
rideState.clear();
}
+
+ private long getTimerTime(TaxiRide ride) {
+ return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
+ }
}
}
diff --git a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
index beff3eb..026e186 100644
--- a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
+++ b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
@@ -18,6 +18,7 @@
package org.apache.flink.training.solutions.longrides.scala
+import scala.concurrent.duration._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
@@ -57,40 +58,43 @@ object LongRidesSolution {
}
class MatchFunction extends KeyedProcessFunction[Long, TaxiRide, TaxiRide] {
- // keyed, managed state
- // holds an END event if the ride has ended, otherwise a START event
lazy val rideState: ValueState[TaxiRide] = getRuntimeContext.getState(
- new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
+ new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
override def processElement(ride: TaxiRide,
context: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#Context,
out: Collector[TaxiRide]): Unit = {
- val timerService = context.timerService
- if (ride.isStart) {
- // the matching END might have arrived first; don't overwrite it
- if (rideState.value() == null) {
- rideState.update(ride)
- }
- }
- else {
+ val previousRideEvent = rideState.value()
+
+ if (previousRideEvent == null) {
rideState.update(ride)
+ if (ride.isStart) {
+ context.timerService().registerEventTimeTimer(getTimerTime(ride))
+ }
+ } else {
+ if (!ride.isStart) {
+ // it's an END event, so event saved was the START event and has a timer
+ // the timer hasn't fired yet, and we can safely kill the timer
+ context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent))
+ }
+ // both events have now been seen, we can clear the state
+ rideState.clear()
}
-
- timerService.registerEventTimeTimer(ride.getEventTime + 120 * 60 * 1000)
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#OnTimerContext,
out: Collector[TaxiRide]): Unit = {
- val savedRide = rideState.value
-
- if (savedRide != null && savedRide.isStart) {
- out.collect(savedRide)
- }
+ // if we get here, we know that the ride started two hours ago, and the END hasn't been processed
+ out.collect(rideState.value())
rideState.clear()
}
+
+ private def getTimerTime(ride: TaxiRide) = {
+ ride.startTime.toEpochMilli + 2.hours.toMillis
+ }
}
}
diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index c0646c6..7d5a24e 100644
--- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -88,6 +88,16 @@ public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
assertEquals(Collections.singletonList(rideStarted), results(source));
}
+ @Test
+ public void startIsDelayedMoreThanTwoHours() throws Exception {
+ TaxiRide rideStarted = startRide(1, BEGINNING);
+ TaxiRide rideEndedAfter1Hour = endRide(rideStarted, BEGINNING.plusSeconds(60 * 60));
+ Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 60).toEpochMilli();
+
+ TestRideSource source = new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, rideStarted);
+ assert(results(source).isEmpty());
+ }
+
private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, Instant endTime) {
return new TaxiRide(rideId, isStart, startTime, endTime, -73.9947F, 40.750626F, -73.9947F, 40.750626F, (short) 1, 0, 0);
}