You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/12/30 14:16:35 UTC

[flink] 08/08: [hotfix][cep] Made PatternStream immutable & changed PatternStreamBuilder to a proper builder

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54fea6072daf2f9868d34994c77ae16930e48f16
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 19 10:10:39 2018 +0100

    [hotfix][cep] Made PatternStream immutable & changed PatternStreamBuilder to a proper builder
---
 .../org/apache/flink/cep/scala/PatternStream.scala |  10 +-
 .../src/main/java/org/apache/flink/cep/CEP.java    |   3 +-
 .../java/org/apache/flink/cep/PatternStream.java   | 133 ++++++++-------------
 .../org/apache/flink/cep/PatternStreamBuilder.java |  79 ++++++++----
 4 files changed, 107 insertions(+), 118 deletions(-)

diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index dd17ea6..d7e7d3f 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -21,9 +21,7 @@ import java.util.{UUID, List => JList, Map => JMap}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.functions.PatternProcessFunction
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
 import org.apache.flink.util.Collector
 
@@ -42,12 +40,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
   private[flink] def wrappedPatternStream = jPatternStream
 
-  def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
-
-  def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream)
-
-  def getComparator: EventComparator[T] = jPatternStream.getComparator
-
   /**
     * Applies a process function to the detected pattern sequence. For each pattern sequence the
     * provided [[PatternProcessFunction]] is called.
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index b7cb391..3774879 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -52,6 +52,7 @@ public class CEP {
 			DataStream<T> input,
 			Pattern<T, ?> pattern,
 			EventComparator<T> comparator) {
-		return new PatternStream<>(input, pattern, comparator);
+		final PatternStream<T> stream = new PatternStream<>(input, pattern);
+		return stream.withComparator(comparator);
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 40508fe..ca918a9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -36,6 +35,7 @@ import java.util.UUID;
 
 import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromFlatSelect;
 import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
@@ -50,42 +50,22 @@ import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
  */
 public class PatternStream<T> {
 
-	// underlying data stream
-	private final DataStream<T> inputStream;
+	private final PatternStreamBuilder<T> builder;
 
-	private final Pattern<T, ?> pattern;
-
-	// comparator to sort events
-	private final EventComparator<T> comparator;
-
-	/**
-	 * Side output {@code OutputTag} for late data. If no tag is set late data will be simply
-	 * dropped.
-	 */
-	private OutputTag<T> lateDataOutputTag;
-
-	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
-		this.inputStream = inputStream;
-		this.pattern = pattern;
-		this.comparator = null;
-	}
-
-	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern, final EventComparator<T> comparator) {
-		this.inputStream = inputStream;
-		this.pattern = pattern;
-		this.comparator = comparator;
+	private PatternStream(final PatternStreamBuilder<T> builder) {
+		this.builder = checkNotNull(builder);
 	}
 
-	public Pattern<T, ?> getPattern() {
-		return pattern;
+	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
+		this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));
 	}
 
-	public DataStream<T> getInputStream() {
-		return inputStream;
+	PatternStream<T> withComparator(final EventComparator<T> comparator) {
+		return new PatternStream<>(builder.withComparator(comparator));
 	}
 
-	public EventComparator<T> getComparator() {
-		return comparator;
+	public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
+		return new PatternStream<>(builder.withLateDataOutputTag(lateDataOutputTag));
 	}
 
 	/**
@@ -100,16 +80,13 @@ public class PatternStream<T> {
 	 *         function.
 	 */
 	public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction) {
-		// we have to extract the output type from the provided pattern selection function manually
-		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
-
 		final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
 			patternProcessFunction,
-			PatternSelectFunction.class,
+			PatternProcessFunction.class,
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -132,13 +109,9 @@ public class PatternStream<T> {
 			final PatternProcessFunction<T, R> patternProcessFunction,
 			final TypeInformation<R> outTypeInfo) {
 
-		return PatternStreamBuilder.createPatternStream(
-			inputStream,
-			pattern,
+		return builder.build(
 			outTypeInfo,
-			comparator,
-			lateDataOutputTag,
-			clean(patternProcessFunction));
+			builder.clean(patternProcessFunction));
 	}
 
 	/**
@@ -162,22 +135,13 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
 		return select(patternSelectFunction, returnType);
 	}
 
-	/**
-	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
-	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
-	 *
-	 * @return The cleaned Function
-	 */
-	private  <F> F clean(F f) {
-		return inputStream.getExecutionEnvironment().clean(f);
-	}
 
 	/**
 	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
@@ -196,7 +160,7 @@ public class PatternStream<T> {
 			final TypeInformation<R> outTypeInfo) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromSelect(clean(patternSelectFunction)).build();
+			fromSelect(builder.clean(patternSelectFunction)).build();
 
 		return process(processFunction, outTypeInfo);
 	}
@@ -236,7 +200,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -279,8 +243,8 @@ public class PatternStream<T> {
 			final PatternSelectFunction<T, R> patternSelectFunction) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromSelect(clean(patternSelectFunction))
-				.withTimeoutHandler(timedOutPartialMatchesTag, clean(patternTimeoutFunction))
+			fromSelect(builder.clean(patternSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternTimeoutFunction))
 				.build();
 
 		return process(processFunction, outTypeInfo);
@@ -319,7 +283,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -329,7 +293,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -338,17 +302,17 @@ public class PatternStream<T> {
 		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timeoutTypeInfo);
 
 		final PatternProcessFunction<T, R> processFunction =
-				fromSelect(clean(patternSelectFunction))
-						.withTimeoutHandler(outputTag, clean(patternTimeoutFunction))
-						.build();
+			fromSelect(builder.clean(patternSelectFunction))
+				.withTimeoutHandler(outputTag, builder.clean(patternTimeoutFunction))
+				.build();
 
 		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
 		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
 
 		return mainStream
-				.connect(timedOutStream)
-				.map(new CoMapTimeout<>())
-				.returns(outTypeInfo);
+			.connect(timedOutStream)
+			.map(new CoMapTimeout<>())
+			.returns(outTypeInfo);
 	}
 
 	/**
@@ -362,17 +326,17 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatSelect(
-			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
+
 		final TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[] {1, 0},
-			inputStream.getType(),
+			new int[]{1, 0},
+			builder.getInputType(),
 			null,
 			false);
 
@@ -396,7 +360,8 @@ public class PatternStream<T> {
 			final TypeInformation<R> outTypeInfo) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromFlatSelect(clean(patternFlatSelectFunction)).build();
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.build();
 
 		return process(processFunction, outTypeInfo);
 	}
@@ -436,11 +401,12 @@ public class PatternStream<T> {
 			0,
 			1,
 			new int[]{1, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		return flatSelect(timedOutPartialMatchesTag,
+		return flatSelect(
+			timedOutPartialMatchesTag,
 			patternFlatTimeoutFunction,
 			rightTypeInfo,
 			patternFlatSelectFunction);
@@ -478,8 +444,8 @@ public class PatternStream<T> {
 			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromFlatSelect(clean(patternFlatSelectFunction))
-				.withTimeoutHandler(timedOutPartialMatchesTag, clean(patternFlatTimeoutFunction))
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternFlatTimeoutFunction))
 				.build();
 
 		return process(processFunction, outTypeInfo);
@@ -519,7 +485,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			new int[]{2, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -529,15 +495,15 @@ public class PatternStream<T> {
 			0,
 			1,
 			new int[]{1, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
 		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timedOutTypeInfo);
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromFlatSelect(clean(patternFlatSelectFunction))
-				.withTimeoutHandler(outputTag, clean(patternFlatTimeoutFunction))
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.withTimeoutHandler(outputTag, builder.clean(patternFlatTimeoutFunction))
 				.build();
 
 		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
@@ -545,14 +511,9 @@ public class PatternStream<T> {
 		final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
 
 		return mainStream
-			.connect(timedOutStream)
-			.map(new CoMapTimeout<>())
-			.returns(outTypeInfo);
-	}
-
-	public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
-		this.lateDataOutputTag = clean(lateDataOutputTag);
-		return this;
+				.connect(timedOutStream)
+				.map(new CoMapTimeout<>())
+				.returns(outTypeInfo);
 	}
 
 	/**
@@ -564,12 +525,12 @@ public class PatternStream<T> {
 		private static final long serialVersionUID = 2059391566945212552L;
 
 		@Override
-		public Either<L, R> map1(R value) throws Exception {
+		public Either<L, R> map1(R value) {
 			return Either.Right(value);
 		}
 
 		@Override
-		public Either<L, R> map2(L value) throws Exception {
+		public Either<L, R> map2(L value) {
 			return Either.Left(value);
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
index a2c430a..13d68e2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -45,46 +46,77 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utility method for creating {@link PatternStream}.
  */
 @Internal
-final class PatternStreamBuilder {
+final class PatternStreamBuilder<IN> {
+
+	private final DataStream<IN> inputStream;
+
+	private final Pattern<IN, ?> pattern;
+
+	private final EventComparator<IN> comparator;
+
+	/**
+	 * Side output {@code OutputTag} for late data.
+	 * If no tag is set late data will be simply dropped.
+	 */
+	private final OutputTag<IN> lateDataOutputTag;
+
+	private PatternStreamBuilder(
+			final DataStream<IN> inputStream,
+			final Pattern<IN, ?> pattern,
+			@Nullable final EventComparator<IN> comparator,
+			@Nullable final OutputTag<IN> lateDataOutputTag) {
+
+		this.inputStream = checkNotNull(inputStream);
+		this.pattern = checkNotNull(pattern);
+		this.comparator = comparator;
+		this.lateDataOutputTag = lateDataOutputTag;
+	}
+
+	TypeInformation<IN> getInputType() {
+		return inputStream.getType();
+	}
+
+	/**
+	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
+	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
+	 *
+	 * @return The cleaned Function
+	 */
+	<F> F clean(F f) {
+		return inputStream.getExecutionEnvironment().clean(f);
+	}
+
+	PatternStreamBuilder<IN> withComparator(final EventComparator<IN> comparator) {
+		return new PatternStreamBuilder<>(inputStream, pattern, checkNotNull(comparator), lateDataOutputTag);
+	}
+
+	PatternStreamBuilder<IN> withLateDataOutputTag(final OutputTag<IN> lateDataOutputTag) {
+		return new PatternStreamBuilder<>(inputStream, pattern, comparator, checkNotNull(lateDataOutputTag));
+	}
 
 	/**
 	 * Creates a data stream containing results of {@link PatternProcessFunction} to fully matching event patterns.
 	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
 	 * @param processFunction function to be applied to matching event sequences
 	 * @param outTypeInfo output TypeInformation of
 	 *        {@link PatternProcessFunction#processMatch(Map, PatternProcessFunction.Context, Collector)}
-	 * @param <IN> type of input events
 	 * @param <OUT> type of output events
 	 * @return Data stream containing fully matched event sequence with applied {@link PatternProcessFunction}
 	 */
-	static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
+	<OUT, K> SingleOutputStreamOperator<OUT> build(
 			final TypeInformation<OUT> outTypeInfo,
-			@Nullable final EventComparator<IN> comparator,
-			@Nullable final OutputTag<IN> lateDataOutputTag,
 			final PatternProcessFunction<IN, OUT> processFunction) {
 
-		checkNotNull(inputStream);
-		checkNotNull(pattern);
 		checkNotNull(outTypeInfo);
 		checkNotNull(processFunction);
 
-		final TypeSerializer<IN> inputSerializer =
-				inputStream.getType().createSerializer(inputStream.getExecutionConfig());
-
-		// check whether we use processing time
-		final boolean isProcessingTime =
-			inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+		final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
 
 		final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
-
-		// compile our pattern into a NFAFactory to instantiate NFAs later on
 		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
 
-		CepOperator<IN, K, OUT> operator = new CepOperator<>(
+		final CepOperator<IN, K, OUT> operator = new CepOperator<>(
 			inputSerializer,
 			isProcessingTime,
 			nfaFactory,
@@ -108,12 +140,15 @@ final class PatternStreamBuilder {
 				"GlobalCepOperator",
 				outTypeInfo,
 				operator
-				).forceNonParallel();
+			).forceNonParallel();
 		}
 
 		return patternStream;
 	}
 
-	private PatternStreamBuilder() {
+	// ---------------------------------------- factory-like methods ---------------------------------------- //
+
+	static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
+		return new PatternStreamBuilder<>(inputStream, pattern, null, null);
 	}
 }