You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/15 17:35:10 UTC

[GitHub] azagrebin commented on a change in pull request #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end test.

azagrebin commented on a change in pull request #7036: [FLINK-10531][e2e] Fix unstable TTL end-to-end test.
URL: https://github.com/apache/flink/pull/7036#discussion_r233938203
 
 

 ##########
 File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
 ##########
 @@ -90,11 +107,83 @@ public static void main(String[] args) throws Exception {
 			.addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime))
 			.name("TtlStateUpdateSource")
 			.keyBy(TtlStateUpdate::getKey)
-			.flatMap(new TtlVerifyUpdateFunction(ttlConfig, reportStatAfterUpdatesNum))
+			.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, reportStatAfterUpdatesNum))
 			.name("TtlVerifyUpdateFunction")
 			.addSink(new PrintSinkFunction<>())
 			.name("PrintFailedVerifications");
 
 		env.execute("State TTL test job");
 	}
+
+	/**
+	 * Sets the state backend to a new {@link StubStateBackend} which has a {@link MonotonicTTLTimeProvider}.
+	 *
+	 * @param env The {@link StreamExecutionEnvironment} of the job.
+	 * @return The {@link MonotonicTTLTimeProvider}.
+	 */
+	private static MonotonicTTLTimeProvider setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
+		final MonotonicTTLTimeProvider ttlTimeProvider = new MonotonicTTLTimeProvider();
+
+		final StateBackend configuredBackend = env.getStateBackend();
+		final StateBackend stubBackend = new StubStateBackend(configuredBackend, ttlTimeProvider);
+		env.setStateBackend(stubBackend);
+
+		return ttlTimeProvider;
+	}
+
+	/**
+	 * A stub implementation of the {@link StateBackend} that allows the use of
+	 * a custom {@link TtlTimeProvider}.
+	 */
+	private static final class StubStateBackend implements StateBackend {
 
 Review comment:
   Let's also generate UUID for `StubStateBackend`, `MonotonicTTLTimeProvider`, `TtlVerifyUpdateFunction`, `UpdateStat`, `AggregateFunction`, `TtlUpdateContext`, `TtlVerificationContext`, `ValueWithTs` and `ValueWithTs.Serializer`.
   
   I would suggest to create `StateBackendWrapperAdaptor` in main code along with `StateBackend` as well which would always wrap `wrappedBackend` and forward all `StateBackend` interface methods. Here we would need to override only relevant methods. The adaptor could be reused in future for similar cases like we have here. If `StateBackend` interface gets more methods, they would need default forwarding only in `StateBackendWrapperAdaptor` and other extending classes can stay untouched.
   Otherwise, I would suggest to move this class at least to separate file.
   I leave these last ideas up to you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services