You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:09 UTC

[flink] 01/05: [hotfix][cep] Change contract of cep TimerContext#timestamp to never return null

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e4648715f97338bb8ee0c8cc790bdbecc5b4422
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jan 7 15:23:40 2019 +0100

    [hotfix][cep] Change contract of cep TimerContext#timestamp to never
    return null
---
 .../org/apache/flink/cep/context/TimerContext.java |  6 ++--
 .../adaptors/PatternTimeoutFlatSelectAdapter.java  |  3 +-
 .../adaptors/PatternTimeoutSelectAdapter.java      |  3 +-
 .../org/apache/flink/cep/operator/CepOperator.java | 11 +++----
 .../operator/CepProcessFunctionContextTest.java    | 35 ++++++++++++----------
 5 files changed, 28 insertions(+), 30 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
index 23367cd..4f0eab2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
@@ -32,10 +32,10 @@ public interface TimerContext {
 	/**
 	 * Timestamp of the element currently being processed.
 	 *
-	 * <p>This might be {@code null}, for example if the time characteristic of your program
-	 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
+	 * time when event entered the cep operator.
 	 */
-	Long timestamp();
+	long timestamp();
 
 	/** Returns the current processing time. */
 	long currentProcessingTime();
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
index ab9c97d..13d04bd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
@@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter<IN, OUT, T>
 			Map<String, List<IN>> match,
 			Context ctx) throws Exception {
 		sideCollector.setCtx(ctx);
-		long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
-		flatTimeoutFunction.timeout(match, timestamp, sideCollector);
+		flatTimeoutFunction.timeout(match, ctx.timestamp(), sideCollector);
 	}
 
 	/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
index 29b0cf7..81999a8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
@@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter<IN, OUT, T>
 			final Map<String, List<IN>> match,
 			final Context ctx) throws Exception {
 
-		final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
-		final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
+		final T timedOutPatternResult = timeoutFunction.timeout(match, ctx.timestamp());
 
 		ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 6df1fa0..262d89d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -459,8 +459,9 @@ public class CepOperator<IN, KEY, OUT>
 	private void setTimestamp(long timestamp) {
 		if (!isProcessingTime) {
 			collector.setAbsoluteTimestamp(timestamp);
-			context.setTimestamp(timestamp);
 		}
+
+		context.setTimestamp(timestamp);
 	}
 
 	/**
@@ -493,12 +494,8 @@ public class CepOperator<IN, KEY, OUT>
 		}
 
 		@Override
-		public Long timestamp() {
-			if (isProcessingTime) {
-				return null;
-			} else {
-				return timestamp;
-			}
+		public long timestamp() {
+			return timestamp;
 		}
 
 		@Override
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
index 0947748..baefe84 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
  */
 public class CepProcessFunctionContextTest extends TestLogger {
 
-	private static final boolean PROCESSING_TIME = false;
-	private static final boolean EVENT_TIME = true;
+	private static final boolean PROCESSING_TIME = true;
+	private static final boolean EVENT_TIME = false;
 
 	@Test
 	public void testTimestampPassingInEventTime() throws Exception {
@@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(1),
 					new NFAForwardingFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(1),
 					new NFAForwardingFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
+			harness.setProcessingTime(1);
 			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.setProcessingTime(2);
 			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+			harness.setProcessingTime(3);
 
 			assertOutput(harness.getOutput())
-				.nextElementEquals("(NO_TIMESTAMP):A")
-				.nextElementEquals("(NO_TIMESTAMP):B")
+				.nextElementEquals("1:A")
+				.nextElementEquals("2:B")
 				.hasNoMoreElements();
 		}
 	}
@@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(1),
 					new NFAForwardingFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(15);
@@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(1),
 					new NFAForwardingFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(10);
@@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(2, timedOut),
 					new NFATimingOutFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(2, timedOut),
 					new NFATimingOutFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(3);
@@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger {
 			harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
 
 			assertOutput(harness.getOutput())
-				.nextElementEquals("(NO_TIMESTAMP):A:C")
+				.nextElementEquals("5:A:C")
 				.hasNoMoreElements();
 
 			assertOutput(harness.getSideOutput(timedOut))
-				.nextElementEquals("(NO_TIMESTAMP):C")
+				.nextElementEquals("15:C")
 				.hasNoMoreElements();
 		}
 	}
@@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
 					new NFATimingOutFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
 					new NFATimingOutFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(3);
@@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 	 */
 	private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
 		return new AccessContextWithNames(stateNumber,
-			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+			context -> String.valueOf(context.timestamp()));
 	}
 
 	/**
@@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 			OutputTag<String> timedOutTag) {
 		return new AccessContextWithNamesWithTimedOut(stateNumber,
 			timedOutTag,
-			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+			context -> String.valueOf(context.timestamp()));
 	}
 
 	/**