You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/19 10:52:58 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #13676: [FLINK-19326][cep] Allow explicitly configuring time behaviour on CEP PatternStream

aljoscha commented on a change in pull request #13676:
URL: https://github.com/apache/flink/pull/13676#discussion_r507652668



##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
##########
@@ -52,7 +52,48 @@
 			DataStream<T> input,
 			Pattern<T, ?> pattern,
 			EventComparator<T> comparator) {
-		final PatternStream<T> stream = new PatternStream<>(input, pattern);
-		return stream.withComparator(comparator);
+		return new PatternStream<>(input, pattern).withComparator(comparator);
+	}
+
+	/**
+	 * Creates a {@link PatternStream} from an input data stream and a pattern.
+	 *
+	 * @param input DataStream containing the input events
+	 * @param pattern Pattern specification which shall be detected
+	 * @param isProcessingTime Time behaviour to specify processing time or event time
+	 * @param <T> Type of the input events
+	 * @return Resulting pattern stream
+	 */
+	public static <T> PatternStream<T> pattern(
+			DataStream<T> input,
+			Pattern<T, ?> pattern,
+			boolean isProcessingTime) {

Review comment:
       I think we don't need to expose an additional parameter because users can use `inProcessingTime()` and `inEventTime()` right?

##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
##########
@@ -52,7 +52,48 @@
 			DataStream<T> input,
 			Pattern<T, ?> pattern,
 			EventComparator<T> comparator) {
-		final PatternStream<T> stream = new PatternStream<>(input, pattern);
-		return stream.withComparator(comparator);
+		return new PatternStream<>(input, pattern).withComparator(comparator);
+	}
+
+	/**
+	 * Creates a {@link PatternStream} from an input data stream and a pattern.
+	 *
+	 * @param input DataStream containing the input events
+	 * @param pattern Pattern specification which shall be detected
+	 * @param isProcessingTime Time behaviour to specify processing time or event time
+	 * @param <T> Type of the input events
+	 * @return Resulting pattern stream
+	 */
+	public static <T> PatternStream<T> pattern(
+			DataStream<T> input,
+			Pattern<T, ?> pattern,
+			boolean isProcessingTime) {

Review comment:
       It's better to just have one way of doing things. Keeps the API simpler. What do you think?

##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
##########
@@ -60,14 +59,28 @@
 	 */
 	private final OutputTag<IN> lateDataOutputTag;
 
+	private boolean isProcessingTime;

Review comment:
       I think it would be better to introduce an enum for this. Sth like `TimeCharacteristic` that has `PROCESSING_TIME` and `EVENT_TIME`. I find it's often hard to see what `true` or `false` mean when you just read a method call.
   
   This should be internal, though, and the only API for users should be `inEventTime()` and `inProcessingTime()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org