You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:09 UTC
[flink] 01/05: [hotfix][cep] Change contract of cep
TimerContext#timestamp to never return null
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e4648715f97338bb8ee0c8cc790bdbecc5b4422
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jan 7 15:23:40 2019 +0100
[hotfix][cep] Change contract of cep TimerContext#timestamp to never
return null
---
.../org/apache/flink/cep/context/TimerContext.java | 6 ++--
.../adaptors/PatternTimeoutFlatSelectAdapter.java | 3 +-
.../adaptors/PatternTimeoutSelectAdapter.java | 3 +-
.../org/apache/flink/cep/operator/CepOperator.java | 11 +++----
.../operator/CepProcessFunctionContextTest.java | 35 ++++++++++++----------
5 files changed, 28 insertions(+), 30 deletions(-)
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
index 23367cd..4f0eab2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
@@ -32,10 +32,10 @@ public interface TimerContext {
/**
* Timestamp of the element currently being processed.
*
- * <p>This might be {@code null}, for example if the time characteristic of your program
- * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
+ * time when event entered the cep operator.
*/
- Long timestamp();
+ long timestamp();
/** Returns the current processing time. */
long currentProcessingTime();
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
index ab9c97d..13d04bd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
@@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter<IN, OUT, T>
Map<String, List<IN>> match,
Context ctx) throws Exception {
sideCollector.setCtx(ctx);
- long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
- flatTimeoutFunction.timeout(match, timestamp, sideCollector);
+ flatTimeoutFunction.timeout(match, ctx.timestamp(), sideCollector);
}
/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
index 29b0cf7..81999a8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
@@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter<IN, OUT, T>
final Map<String, List<IN>> match,
final Context ctx) throws Exception {
- final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
- final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
+ final T timedOutPatternResult = timeoutFunction.timeout(match, ctx.timestamp());
ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 6df1fa0..262d89d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -459,8 +459,9 @@ public class CepOperator<IN, KEY, OUT>
private void setTimestamp(long timestamp) {
if (!isProcessingTime) {
collector.setAbsoluteTimestamp(timestamp);
- context.setTimestamp(timestamp);
}
+
+ context.setTimestamp(timestamp);
}
/**
@@ -493,12 +494,8 @@ public class CepOperator<IN, KEY, OUT>
}
@Override
- public Long timestamp() {
- if (isProcessingTime) {
- return null;
- } else {
- return timestamp;
- }
+ public long timestamp() {
+ return timestamp;
}
@Override
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
index 0947748..baefe84 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
*/
public class CepProcessFunctionContextTest extends TestLogger {
- private static final boolean PROCESSING_TIME = false;
- private static final boolean EVENT_TIME = true;
+ private static final boolean PROCESSING_TIME = true;
+ private static final boolean EVENT_TIME = false;
@Test
public void testTimestampPassingInEventTime() throws Exception {
@@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(1),
new NFAForwardingFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
// events out of order to test if internal sorting does not mess up the timestamps
@@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(1),
new NFAForwardingFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
+ harness.setProcessingTime(1);
harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+ harness.setProcessingTime(2);
harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+ harness.setProcessingTime(3);
assertOutput(harness.getOutput())
- .nextElementEquals("(NO_TIMESTAMP):A")
- .nextElementEquals("(NO_TIMESTAMP):B")
+ .nextElementEquals("1:A")
+ .nextElementEquals("2:B")
.hasNoMoreElements();
}
}
@@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(1),
new NFAForwardingFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
harness.setProcessingTime(15);
@@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(1),
new NFAForwardingFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
harness.setProcessingTime(10);
@@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(2, timedOut),
new NFATimingOutFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
// events out of order to test if internal sorting does not mess up the timestamps
@@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractTimestampAndNames(2, timedOut),
new NFATimingOutFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
harness.setProcessingTime(3);
@@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger {
harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
assertOutput(harness.getOutput())
- .nextElementEquals("(NO_TIMESTAMP):A:C")
+ .nextElementEquals("5:A:C")
.hasNoMoreElements();
assertOutput(harness.getSideOutput(timedOut))
- .nextElementEquals("(NO_TIMESTAMP):C")
+ .nextElementEquals("15:C")
.hasNoMoreElements();
}
}
@@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(2, sideOutputTag),
new NFATimingOutFactory(),
- PROCESSING_TIME))) {
+ EVENT_TIME))) {
harness.open();
// events out of order to test if internal sorting does not mess up the timestamps
@@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
createCepOperator(
extractCurrentProcessingTimeAndNames(2, sideOutputTag),
new NFATimingOutFactory(),
- EVENT_TIME))) {
+ PROCESSING_TIME))) {
harness.open();
harness.setProcessingTime(3);
@@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
*/
private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
return new AccessContextWithNames(stateNumber,
- context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+ context -> String.valueOf(context.timestamp()));
}
/**
@@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
OutputTag<String> timedOutTag) {
return new AccessContextWithNamesWithTimedOut(stateNumber,
timedOutTag,
- context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+ context -> String.valueOf(context.timestamp()));
}
/**