You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/12/30 14:16:35 UTC
[flink] 08/08: [hotfix][cep] Made PatternStream immutable & changed
PatternStreamBuilder to a proper builder
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54fea6072daf2f9868d34994c77ae16930e48f16
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 19 10:10:39 2018 +0100
[hotfix][cep] Made PatternStream immutable & changed PatternStreamBuilder to a proper builder
---
.../org/apache/flink/cep/scala/PatternStream.scala | 10 +-
.../src/main/java/org/apache/flink/cep/CEP.java | 3 +-
.../java/org/apache/flink/cep/PatternStream.java | 133 ++++++++-------------
.../org/apache/flink/cep/PatternStreamBuilder.java | 79 ++++++++----
4 files changed, 107 insertions(+), 118 deletions(-)
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index dd17ea6..d7e7d3f 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -21,9 +21,7 @@ import java.util.{UUID, List => JList, Map => JMap}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.functions.PatternProcessFunction
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.streaming.api.scala.{asScalaStream, _}
import org.apache.flink.util.Collector
@@ -42,12 +40,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
private[flink] def wrappedPatternStream = jPatternStream
- def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
-
- def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream)
-
- def getComparator: EventComparator[T] = jPatternStream.getComparator
-
/**
* Applies a process function to the detected pattern sequence. For each pattern sequence the
* provided [[PatternProcessFunction]] is called.
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index b7cb391..3774879 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -52,6 +52,7 @@ public class CEP {
DataStream<T> input,
Pattern<T, ?> pattern,
EventComparator<T> comparator) {
- return new PatternStream<>(input, pattern, comparator);
+ final PatternStream<T> stream = new PatternStream<>(input, pattern);
+ return stream.withComparator(comparator);
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 40508fe..ca918a9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -36,6 +35,7 @@ import java.util.UUID;
import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromFlatSelect;
import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
@@ -50,42 +50,22 @@ import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
*/
public class PatternStream<T> {
- // underlying data stream
- private final DataStream<T> inputStream;
+ private final PatternStreamBuilder<T> builder;
- private final Pattern<T, ?> pattern;
-
- // comparator to sort events
- private final EventComparator<T> comparator;
-
- /**
- * Side output {@code OutputTag} for late data. If no tag is set late data will be simply
- * dropped.
- */
- private OutputTag<T> lateDataOutputTag;
-
- PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
- this.inputStream = inputStream;
- this.pattern = pattern;
- this.comparator = null;
- }
-
- PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern, final EventComparator<T> comparator) {
- this.inputStream = inputStream;
- this.pattern = pattern;
- this.comparator = comparator;
+ private PatternStream(final PatternStreamBuilder<T> builder) {
+ this.builder = checkNotNull(builder);
}
- public Pattern<T, ?> getPattern() {
- return pattern;
+ PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
+ this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));
}
- public DataStream<T> getInputStream() {
- return inputStream;
+ PatternStream<T> withComparator(final EventComparator<T> comparator) {
+ return new PatternStream<>(builder.withComparator(comparator));
}
- public EventComparator<T> getComparator() {
- return comparator;
+ public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
+ return new PatternStream<>(builder.withLateDataOutputTag(lateDataOutputTag));
}
/**
@@ -100,16 +80,13 @@ public class PatternStream<T> {
* function.
*/
public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction) {
- // we have to extract the output type from the provided pattern selection function manually
- // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
-
final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
patternProcessFunction,
- PatternSelectFunction.class,
+ PatternProcessFunction.class,
0,
1,
TypeExtractor.NO_INDEX,
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
@@ -132,13 +109,9 @@ public class PatternStream<T> {
final PatternProcessFunction<T, R> patternProcessFunction,
final TypeInformation<R> outTypeInfo) {
- return PatternStreamBuilder.createPatternStream(
- inputStream,
- pattern,
+ return builder.build(
outTypeInfo,
- comparator,
- lateDataOutputTag,
- clean(patternProcessFunction));
+ builder.clean(patternProcessFunction));
}
/**
@@ -162,22 +135,13 @@ public class PatternStream<T> {
0,
1,
TypeExtractor.NO_INDEX,
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
return select(patternSelectFunction, returnType);
}
- /**
- * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
- * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
- *
- * @return The cleaned Function
- */
- private <F> F clean(F f) {
- return inputStream.getExecutionEnvironment().clean(f);
- }
/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
@@ -196,7 +160,7 @@ public class PatternStream<T> {
final TypeInformation<R> outTypeInfo) {
final PatternProcessFunction<T, R> processFunction =
- fromSelect(clean(patternSelectFunction)).build();
+ fromSelect(builder.clean(patternSelectFunction)).build();
return process(processFunction, outTypeInfo);
}
@@ -236,7 +200,7 @@ public class PatternStream<T> {
0,
1,
TypeExtractor.NO_INDEX,
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
@@ -279,8 +243,8 @@ public class PatternStream<T> {
final PatternSelectFunction<T, R> patternSelectFunction) {
final PatternProcessFunction<T, R> processFunction =
- fromSelect(clean(patternSelectFunction))
- .withTimeoutHandler(timedOutPartialMatchesTag, clean(patternTimeoutFunction))
+ fromSelect(builder.clean(patternSelectFunction))
+ .withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternTimeoutFunction))
.build();
return process(processFunction, outTypeInfo);
@@ -319,7 +283,7 @@ public class PatternStream<T> {
0,
1,
TypeExtractor.NO_INDEX,
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
@@ -329,7 +293,7 @@ public class PatternStream<T> {
0,
1,
TypeExtractor.NO_INDEX,
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
@@ -338,17 +302,17 @@ public class PatternStream<T> {
final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timeoutTypeInfo);
final PatternProcessFunction<T, R> processFunction =
- fromSelect(clean(patternSelectFunction))
- .withTimeoutHandler(outputTag, clean(patternTimeoutFunction))
- .build();
+ fromSelect(builder.clean(patternSelectFunction))
+ .withTimeoutHandler(outputTag, builder.clean(patternTimeoutFunction))
+ .build();
final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
return mainStream
- .connect(timedOutStream)
- .map(new CoMapTimeout<>())
- .returns(outTypeInfo);
+ .connect(timedOutStream)
+ .map(new CoMapTimeout<>())
+ .returns(outTypeInfo);
}
/**
@@ -362,17 +326,17 @@ public class PatternStream<T> {
* @return {@link DataStream} which contains the resulting elements from the pattern flat select
* function.
*/
- public <R> SingleOutputStreamOperator<R> flatSelect(
- final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+ public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
// we have to extract the output type from the provided pattern selection function manually
// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
+
final TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatSelectFunction,
PatternFlatSelectFunction.class,
0,
1,
- new int[] {1, 0},
- inputStream.getType(),
+ new int[]{1, 0},
+ builder.getInputType(),
null,
false);
@@ -396,7 +360,8 @@ public class PatternStream<T> {
final TypeInformation<R> outTypeInfo) {
final PatternProcessFunction<T, R> processFunction =
- fromFlatSelect(clean(patternFlatSelectFunction)).build();
+ fromFlatSelect(builder.clean(patternFlatSelectFunction))
+ .build();
return process(processFunction, outTypeInfo);
}
@@ -436,11 +401,12 @@ public class PatternStream<T> {
0,
1,
new int[]{1, 0},
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
- return flatSelect(timedOutPartialMatchesTag,
+ return flatSelect(
+ timedOutPartialMatchesTag,
patternFlatTimeoutFunction,
rightTypeInfo,
patternFlatSelectFunction);
@@ -478,8 +444,8 @@ public class PatternStream<T> {
final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
final PatternProcessFunction<T, R> processFunction =
- fromFlatSelect(clean(patternFlatSelectFunction))
- .withTimeoutHandler(timedOutPartialMatchesTag, clean(patternFlatTimeoutFunction))
+ fromFlatSelect(builder.clean(patternFlatSelectFunction))
+ .withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternFlatTimeoutFunction))
.build();
return process(processFunction, outTypeInfo);
@@ -519,7 +485,7 @@ public class PatternStream<T> {
0,
1,
new int[]{2, 0},
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
@@ -529,15 +495,15 @@ public class PatternStream<T> {
0,
1,
new int[]{1, 0},
- inputStream.getType(),
+ builder.getInputType(),
null,
false);
final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timedOutTypeInfo);
final PatternProcessFunction<T, R> processFunction =
- fromFlatSelect(clean(patternFlatSelectFunction))
- .withTimeoutHandler(outputTag, clean(patternFlatTimeoutFunction))
+ fromFlatSelect(builder.clean(patternFlatSelectFunction))
+ .withTimeoutHandler(outputTag, builder.clean(patternFlatTimeoutFunction))
.build();
final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
@@ -545,14 +511,9 @@ public class PatternStream<T> {
final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
return mainStream
- .connect(timedOutStream)
- .map(new CoMapTimeout<>())
- .returns(outTypeInfo);
- }
-
- public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
- this.lateDataOutputTag = clean(lateDataOutputTag);
- return this;
+ .connect(timedOutStream)
+ .map(new CoMapTimeout<>())
+ .returns(outTypeInfo);
}
/**
@@ -564,12 +525,12 @@ public class PatternStream<T> {
private static final long serialVersionUID = 2059391566945212552L;
@Override
- public Either<L, R> map1(R value) throws Exception {
+ public Either<L, R> map1(R value) {
return Either.Right(value);
}
@Override
- public Either<L, R> map2(L value) throws Exception {
+ public Either<L, R> map2(L value) {
return Either.Left(value);
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
index a2c430a..13d68e2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
@@ -19,6 +19,7 @@
package org.apache.flink.cep;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
@@ -45,46 +46,77 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Utility method for creating {@link PatternStream}.
*/
@Internal
-final class PatternStreamBuilder {
+final class PatternStreamBuilder<IN> {
+
+ private final DataStream<IN> inputStream;
+
+ private final Pattern<IN, ?> pattern;
+
+ private final EventComparator<IN> comparator;
+
+ /**
+ * Side output {@code OutputTag} for late data.
+ * If no tag is set late data will be simply dropped.
+ */
+ private final OutputTag<IN> lateDataOutputTag;
+
+ private PatternStreamBuilder(
+ final DataStream<IN> inputStream,
+ final Pattern<IN, ?> pattern,
+ @Nullable final EventComparator<IN> comparator,
+ @Nullable final OutputTag<IN> lateDataOutputTag) {
+
+ this.inputStream = checkNotNull(inputStream);
+ this.pattern = checkNotNull(pattern);
+ this.comparator = comparator;
+ this.lateDataOutputTag = lateDataOutputTag;
+ }
+
+ TypeInformation<IN> getInputType() {
+ return inputStream.getType();
+ }
+
+ /**
+ * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
+ * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
+ *
+ * @return The cleaned Function
+ */
+ <F> F clean(F f) {
+ return inputStream.getExecutionEnvironment().clean(f);
+ }
+
+ PatternStreamBuilder<IN> withComparator(final EventComparator<IN> comparator) {
+ return new PatternStreamBuilder<>(inputStream, pattern, checkNotNull(comparator), lateDataOutputTag);
+ }
+
+ PatternStreamBuilder<IN> withLateDataOutputTag(final OutputTag<IN> lateDataOutputTag) {
+ return new PatternStreamBuilder<>(inputStream, pattern, comparator, checkNotNull(lateDataOutputTag));
+ }
/**
* Creates a data stream containing results of {@link PatternProcessFunction} to fully matching event patterns.
*
- * @param inputStream stream of input events
- * @param pattern pattern to be search for in the stream
* @param processFunction function to be applied to matching event sequences
* @param outTypeInfo output TypeInformation of
* {@link PatternProcessFunction#processMatch(Map, PatternProcessFunction.Context, Collector)}
- * @param <IN> type of input events
* @param <OUT> type of output events
* @return Data stream containing fully matched event sequence with applied {@link PatternProcessFunction}
*/
- static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
- final DataStream<IN> inputStream,
- final Pattern<IN, ?> pattern,
+ <OUT, K> SingleOutputStreamOperator<OUT> build(
final TypeInformation<OUT> outTypeInfo,
- @Nullable final EventComparator<IN> comparator,
- @Nullable final OutputTag<IN> lateDataOutputTag,
final PatternProcessFunction<IN, OUT> processFunction) {
- checkNotNull(inputStream);
- checkNotNull(pattern);
checkNotNull(outTypeInfo);
checkNotNull(processFunction);
- final TypeSerializer<IN> inputSerializer =
- inputStream.getType().createSerializer(inputStream.getExecutionConfig());
-
- // check whether we use processing time
- final boolean isProcessingTime =
- inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+ final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+ final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
-
- // compile our pattern into a NFAFactory to instantiate NFAs later on
final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
- CepOperator<IN, K, OUT> operator = new CepOperator<>(
+ final CepOperator<IN, K, OUT> operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
nfaFactory,
@@ -108,12 +140,15 @@ final class PatternStreamBuilder {
"GlobalCepOperator",
outTypeInfo,
operator
- ).forceNonParallel();
+ ).forceNonParallel();
}
return patternStream;
}
- private PatternStreamBuilder() {
+ // ---------------------------------------- factory-like methods ---------------------------------------- //
+
+ static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
+ return new PatternStreamBuilder<>(inputStream, pattern, null, null);
}
}