You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2020/08/25 17:50:02 UTC

[flink-training] branch master updated: [FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case (#13)

This is an automated email from the ASF dual-hosted git repository.

danderson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
     new ea4a66e  [FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case (#13)
ea4a66e is described below

commit ea4a66e97dd211bd8f8b8e415e3e427c30e4746b
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Tue Aug 25 19:49:55 2020 +0200

    [FLINK-18630] Redo the LongRideAlerts solutions to cover a corner case (#13)
---
 long-ride-alerts/DISCUSSION.md                     | 16 +++++++++
 long-ride-alerts/README.md                         | 18 ++++++++--
 .../solutions/longrides/LongRidesSolution.java     | 41 ++++++++++++----------
 .../longrides/scala/LongRidesSolution.scala        | 40 +++++++++++----------
 .../exercises/longrides/LongRidesTest.java         | 10 ++++++
 5 files changed, 86 insertions(+), 39 deletions(-)

diff --git a/long-ride-alerts/DISCUSSION.md b/long-ride-alerts/DISCUSSION.md
index bf54c4f..f931ffa 100644
--- a/long-ride-alerts/DISCUSSION.md
+++ b/long-ride-alerts/DISCUSSION.md
@@ -21,7 +21,23 @@ under the License.
 
 (Discussion of [Lab: `ProcessFunction` and Timers (Long Ride Alerts)](./))
 
+It would be interesting to test that the solution does not leak state.
 
+A good way to write unit tests for a `KeyedProcessFunction` to check for state retention, etc., is to
+use the test harnesses described in the
+[documentation on testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators). 
+
+In fact, the reference solutions will leak state in the case where a START event is missing. They also
+leak in the case where the alert is generated, but then the END event does eventually arrive (after `onTimer()`
+has cleared the matching START event).
+
+This could be addressed either by using [state TTL](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl),
+or by using another timer that eventually
+clears any remaining state. There is a tradeoff here, however: once that state has been removed,
+then if the matching events are not actually missing, but are instead very, very late, they will cause erroneous alerts.
+
+This tradeoff between keeping state indefinitely versus occasionally getting things wrong when events are
+exceptionally late is a challenge that is inherent to stateful stream processing.
 
 -----
 
diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md
index f6514a5..a5d2ac7 100644
--- a/long-ride-alerts/README.md
+++ b/long-ride-alerts/README.md
@@ -62,13 +62,25 @@ The resulting stream should be printed to standard out.
 <details>
 <summary><strong>Overall approach</strong></summary>
 
-This exercise revolves around using a `ProcessFunction` to manage some keyed state and event time timers, and doing so in a way that works even when the END event for a given `rideId` arrives before the START (which will happen). The challenge is figuring out what state to keep, and when to set and clear that state.
+This exercise revolves around using a `ProcessFunction` to manage some keyed state and event time timers, 
+and doing so in a way that works even when the END event for a given `rideId` arrives before the START (which can happen). 
+The challenge is figuring out what state to keep, and when to set and clear that state.
+You will want to use event time timers that fire two hours after an incoming START event, and in the `onTimer()` method, 
+collect START events to the output only if a matching END event hasn't yet arrived.
 </details>
 
 <details>
-<summary><strong>Timers and State</strong></summary>
+<summary><strong>State and timers</strong></summary>
 
-You will want to use event time timers that fire two hours after the incoming events, and in the `onTimer()` method, collect START events to the output only if a matching END event hasn't yet arrived. As for what state to keep, it is enough to remember the "last" event for each `rideId`, where "last" is based on event time and ride type (START vs END &mdash; yes, there are rides where the START and END have the same timestamp), rather than the order in which the events are processed. The [...]
+There are many possible solutions for this exercise, but in general it is enough to keep one
+`TaxiRide` in state (one `TaxiRide` for each key, or `rideId`). The approach used in the reference solution is to
+store whichever event arrives first (the START or the END), and if it's a START event,
+create a timer for two hours later. If and when the other event (for the same `rideId`) arrives,
+carefully clean things up.
+
+It is possible to arrange this so that if `onTimer()` is called, you are guaranteed that
+an alert (i.e., the ride kept in state) should be emitted. Writing the code this way conveniently
+puts all of the complex business logic together in one place (in the `processElement()` method).
 </details>
 
 ## Documentation
diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index dc6ac2e..426e2db 100644
--- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -65,41 +64,47 @@ public class LongRidesSolution extends ExerciseBase {
 	}
 
 	private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {
-		// keyed, managed state
-		// holds an END event if the ride has ended, otherwise a START event
+
 		private ValueState<TaxiRide> rideState;
 
 		@Override
 		public void open(Configuration config) {
-			ValueStateDescriptor<TaxiRide> startDescriptor =
-					new ValueStateDescriptor<>("saved ride", TaxiRide.class);
-			rideState = getRuntimeContext().getState(startDescriptor);
+			ValueStateDescriptor<TaxiRide> stateDescriptor =
+					new ValueStateDescriptor<>("ride event", TaxiRide.class);
+			rideState = getRuntimeContext().getState(stateDescriptor);
 		}
 
 		@Override
 		public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
-			TimerService timerService = context.timerService();
+			TaxiRide previousRideEvent = rideState.value();
 
-			if (ride.isStart) {
-				// the matching END might have arrived first; don't overwrite it
-				if (rideState.value() == null) {
-					rideState.update(ride);
+			if (previousRideEvent == null) {
+				rideState.update(ride);
+				if (ride.isStart) {
+					context.timerService().registerEventTimeTimer(getTimerTime(ride));
 				}
 			} else {
-				rideState.update(ride);
+				if (!ride.isStart) {
+					// it's an END event, so event saved was the START event and has a timer
+					// the timer hasn't fired yet, and we can safely kill the timer
+					context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
+				}
+				// both events have now been seen, we can clear the state
+				rideState.clear();
 			}
-
-			timerService.registerEventTimeTimer(ride.getEventTime() + 120 * 60 * 1000);
 		}
 
 		@Override
 		public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
-			TaxiRide savedRide = rideState.value();
-			if (savedRide != null && savedRide.isStart) {
-				out.collect(savedRide);
-			}
+
+			// if we get here, we know that the ride started two hours ago, and the END hasn't been processed
+			out.collect(rideState.value());
 			rideState.clear();
 		}
+
+		private long getTimerTime(TaxiRide ride) {
+			return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
+		}
 	}
 
 }
diff --git a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
index beff3eb..026e186 100644
--- a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
+++ b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.training.solutions.longrides.scala
 
+import scala.concurrent.duration._
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
@@ -57,40 +58,43 @@ object LongRidesSolution {
   }
 
   class MatchFunction extends KeyedProcessFunction[Long, TaxiRide, TaxiRide] {
-    // keyed, managed state
-    // holds an END event if the ride has ended, otherwise a START event
     lazy val rideState: ValueState[TaxiRide] = getRuntimeContext.getState(
-      new ValueStateDescriptor[TaxiRide]("saved ride", classOf[TaxiRide]))
+      new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
 
     override def processElement(ride: TaxiRide,
                                 context: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#Context,
                                 out: Collector[TaxiRide]): Unit = {
-      val timerService = context.timerService
 
-      if (ride.isStart) {
-        // the matching END might have arrived first; don't overwrite it
-        if (rideState.value() == null) {
-          rideState.update(ride)
-        }
-      }
-      else {
+      val previousRideEvent = rideState.value()
+
+      if (previousRideEvent == null) {
         rideState.update(ride)
+        if (ride.isStart) {
+          context.timerService().registerEventTimeTimer(getTimerTime(ride))
+        }
+      } else {
+        if (!ride.isStart) {
+          // it's an END event, so event saved was the START event and has a timer
+          // the timer hasn't fired yet, and we can safely kill the timer
+          context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent))
+        }
+        // both events have now been seen, we can clear the state
+        rideState.clear()
       }
-
-      timerService.registerEventTimeTimer(ride.getEventTime + 120 * 60 * 1000)
     }
 
     override def onTimer(timestamp: Long,
                          ctx: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#OnTimerContext,
                          out: Collector[TaxiRide]): Unit = {
-      val savedRide = rideState.value
-
-      if (savedRide != null && savedRide.isStart) {
-        out.collect(savedRide)
-      }
 
+      // if we get here, we know that the ride started two hours ago, and the END hasn't been processed
+      out.collect(rideState.value())
       rideState.clear()
     }
+
+    private def getTimerTime(ride: TaxiRide) = {
+      ride.startTime.toEpochMilli + 2.hours.toMillis
+    }
   }
 
 }
diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index c0646c6..7d5a24e 100644
--- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -88,6 +88,16 @@ public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
 		assertEquals(Collections.singletonList(rideStarted), results(source));
 	}
 
+	@Test
+	public void startIsDelayedMoreThanTwoHours() throws Exception {
+		TaxiRide rideStarted = startRide(1, BEGINNING);
+		TaxiRide rideEndedAfter1Hour = endRide(rideStarted, BEGINNING.plusSeconds(60 * 60));
+		Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 60).toEpochMilli();
+
+		TestRideSource source = new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, rideStarted);
+		assert(results(source).isEmpty());
+	}
+
 	private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, Instant endTime) {
 		return new TaxiRide(rideId, isStart, startTime, endTime, -73.9947F, 40.750626F, -73.9947F, 40.750626F, (short) 1, 0, 0);
 	}