You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/08/24 06:19:40 UTC
flink git commit: [hotfix] [cep] Spelling corrections
Repository: flink
Updated Branches:
refs/heads/master 3e706b13a -> b3ffd919f
[hotfix] [cep] Spelling corrections
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3ffd919
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3ffd919
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3ffd919
Branch: refs/heads/master
Commit: b3ffd919fd1497fb838d36d5b3a31b2402f0de71
Parents: 3e706b1
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Aug 24 08:18:58 2017 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Aug 24 08:18:58 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/cep/PatternStream.java | 26 +++++++++++---------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 4 +--
.../AbstractKeyedCEPPatternOperator.java | 8 +++---
.../flink/cep/operator/CEPOperatorUtils.java | 20 +++++++--------
.../cep/operator/FlatSelectCepOperator.java | 4 +--
.../operator/FlatSelectTimeoutCepOperator.java | 16 ++++++------
.../flink/cep/operator/SelectCepOperator.java | 4 +--
.../cep/operator/SelectTimeoutCepOperator.java | 18 +++++++-------
.../flink/cep/operator/CEPOperatorTest.java | 8 +++---
9 files changed, 55 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 6380375..79ca736 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.types.Either;
import org.apache.flink.util.OutputTag;
+import java.util.UUID;
+
/**
* Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
* pattern sequences as a map of events associated with their names. The pattern is detected using a
@@ -145,7 +147,7 @@ public class PatternStream<T> {
* {@link SingleOutputStreamOperator} resulting from the select operation
* with the same {@link OutputTag}.
*
- * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns
+ * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternSelectFunction The pattern select function which is called for each detected
@@ -192,7 +194,7 @@ public class PatternStream<T> {
* {@link SingleOutputStreamOperator} resulting from the select operation
* with the same {@link OutputTag}.
*
- * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns
+ * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
* @param patternTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param outTypeInfo Explicit specification of output type.
@@ -235,7 +237,7 @@ public class PatternStream<T> {
* @param <R> Type of the resulting elements
*
* @deprecated Use {@link PatternStream#select(OutputTag, PatternTimeoutFunction, PatternSelectFunction)}
- * that returns timeouted events as a side-output
+ * that returns timed out events as a side-output
*
* @return {@link DataStream} which contains the resulting elements or the resulting timeout
* elements wrapped in an {@link Either} type.
@@ -267,7 +269,7 @@ public class PatternStream<T> {
null,
false);
- final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo);
+ final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo);
final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream(
inputStream,
@@ -278,11 +280,11 @@ public class PatternStream<T> {
outputTag,
clean(patternTimeoutFunction));
- final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag);
+ final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
- return mainStream.connect(timeoutedStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
+ return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
}
/**
@@ -350,7 +352,7 @@ public class PatternStream<T> {
* {@link SingleOutputStreamOperator} resulting from the select operation
* with the same {@link OutputTag}.
*
- * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns
+ * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
* @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternFlatSelectFunction The pattern select function which is called for each detected
@@ -393,7 +395,7 @@ public class PatternStream<T> {
* {@link SingleOutputStreamOperator} resulting from the select operation
* with the same {@link OutputTag}.
*
- * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns
+ * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
* @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial
* pattern sequence which has timed out.
* @param patternFlatSelectFunction The pattern select function which is called for each detected
@@ -437,7 +439,7 @@ public class PatternStream<T> {
* @param <R> Type of the resulting events
*
* @deprecated Use {@link PatternStream#flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)}
- * that returns timeouted events as a side-output
+ * that returns timed out events as a side-output
*
* @return {@link DataStream} which contains the resulting events from the pattern flat select
* function or the resulting timeout events from the pattern flat timeout function wrapped in an
@@ -470,7 +472,7 @@ public class PatternStream<T> {
null,
false);
- final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo);
+ final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo);
final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream(
inputStream,
@@ -481,11 +483,11 @@ public class PatternStream<T> {
outputTag,
clean(patternFlatTimeoutFunction));
- final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag);
+ final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
- return mainStream.connect(timeoutedStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
+ return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3a1f621..11f14b9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -249,8 +249,8 @@ public class NFA<T> implements Serializable {
if (handleTimeout) {
// extract the timed out event pattern
- Map<String, List<T>> timedoutPattern = extractCurrentMatches(computationState);
- timeoutResult.add(Tuple2.of(timedoutPattern, timestamp));
+ Map<String, List<T>> timedOutPattern = extractCurrentMatches(computationState);
+ timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
}
eventSharedBuffer.release(
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 4c67e9d..7556d9f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -80,7 +80,7 @@ import java.util.stream.StreamSupport;
* @param <IN> Type of the input elements
* @param <KEY> Type of the key on which the input stream is keyed
* @param <OUT> Type of the output elements
- * @param <F> user function that can be applied to matching sequences or timeouted sequences
+ * @param <F> user function that can be applied to matching sequences or timed out sequences
*/
public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function>
extends AbstractUdfStreamOperator<OUT, F>
@@ -359,7 +359,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
try {
processMatchedSequences(patterns.f0, timestamp);
- processTimeoutedSequence(patterns.f1, timestamp);
+ processTimedOutSequences(patterns.f1, timestamp);
} catch (Exception e) {
//rethrow as Runtime, to be able to use processEvent in Stream.
throw new RuntimeException(e);
@@ -377,9 +377,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
processEvent(nfa, null, timestamp);
}
- protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception;
+ protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception;
- protected void processTimeoutedSequence(
+ protected void processTimedOutSequences(
Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences,
long timestamp) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index a662faf..cef11e2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -143,19 +143,19 @@ public class CEPOperatorUtils {
/**
* Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns and
- * also timeouted partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput.
+ * also timed out partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput.
*
* @param inputStream stream of input events
* @param pattern pattern to be search for in the stream
* @param selectFunction function to be applied to matching event sequences
* @param outTypeInfo output TypeInformation of selectFunction
- * @param outputTag {@link OutputTag} for a side-output with timeouted matches
- * @param timeoutFunction function to be applied to timeouted event sequences
+ * @param outputTag {@link OutputTag} for a side-output with timed out matches
+ * @param timeoutFunction function to be applied to timed out event sequences
* @param <IN> type of input events
* @param <OUT1> type of fully matched events
- * @param <OUT2> type of timeouted events
+ * @param <OUT2> type of timed out events
* @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction} that
- * contains timeouted patterns with applied {@link PatternFlatTimeoutFunction} as side-output
+ * contains timed out patterns with applied {@link PatternFlatTimeoutFunction} as side-output
*/
public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(
final DataStream<IN> inputStream,
@@ -201,19 +201,19 @@ public class CEPOperatorUtils {
/**
* Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns and
- * also timeouted partially matched with applied {@link PatternTimeoutFunction} as a sideoutput.
+ * also timed out partially matched with applied {@link PatternTimeoutFunction} as a sideoutput.
*
* @param inputStream stream of input events
* @param pattern pattern to be search for in the stream
* @param selectFunction function to be applied to matching event sequences
* @param outTypeInfo output TypeInformation of selectFunction
- * @param outputTag {@link OutputTag} for a side-output with timeouted matches
- * @param timeoutFunction function to be applied to timeouted event sequences
+ * @param outputTag {@link OutputTag} for a side-output with timed out matches
+ * @param timeoutFunction function to be applied to timed out event sequences
* @param <IN> type of input events
* @param <OUT1> type of fully matched events
- * @param <OUT2> type of timeouted events
+ * @param <OUT2> type of timed out events
* @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction} that
- * contains timeouted patterns with applied {@link PatternTimeoutFunction} as side-output
+ * contains timed out patterns with applied {@link PatternTimeoutFunction} as side-output
*/
public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(
final DataStream<IN> inputStream,
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
index d44794e..192a38b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
@@ -58,8 +58,8 @@ public class FlatSelectCepOperator<IN, KEY, OUT>
}
@Override
- protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception {
- for (Map<String, List<IN>> match : matchesSequence) {
+ protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
+ for (Map<String, List<IN>> match : matchingSequences) {
collector.setAbsoluteTimestamp(timestamp);
getUserFunction().flatSelect(match, collector);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
index d46761b..58a9d53 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
@@ -35,13 +35,13 @@ import java.util.Map;
/**
* Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully
- * matched event patterns and {@link PatternFlatTimeoutFunction} to timeouted ones. The timeouted elements are returned
+ * matched event patterns and {@link PatternFlatTimeoutFunction} to timed out ones. The timed out elements are returned
* as a side-output.
*
* @param <IN> Type of the input elements
* @param <KEY> Type of the key on which the input stream is keyed
* @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timeouted output elements
+ * @param <OUT2> Type of the timed out output elements
*/
public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends
AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, FlatSelectTimeoutCepOperator.FlatSelectWrapper<IN, OUT1, OUT2>> {
@@ -50,7 +50,7 @@ public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends
private transient TimestampedSideOutputCollector<OUT2> sideOutputCollector;
- private OutputTag<OUT2> timeoutedOutputTag;
+ private OutputTag<OUT2> timedOutOutputTag;
public FlatSelectTimeoutCepOperator(
TypeSerializer<IN> inputSerializer,
@@ -70,27 +70,27 @@ public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends
migratingFromOldKeyedOperator,
comparator,
new FlatSelectWrapper<>(flatSelectFunction, flatTimeoutFunction));
- this.timeoutedOutputTag = outputTag;
+ this.timedOutOutputTag = outputTag;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
- sideOutputCollector = new TimestampedSideOutputCollector<>(timeoutedOutputTag, output);
+ sideOutputCollector = new TimestampedSideOutputCollector<>(timedOutOutputTag, output);
}
@Override
protected void processMatchedSequences(
- Iterable<Map<String, List<IN>>> matchesSequence,
+ Iterable<Map<String, List<IN>>> matchingSequences,
long timestamp) throws Exception {
- for (Map<String, List<IN>> match : matchesSequence) {
+ for (Map<String, List<IN>> match : matchingSequences) {
getUserFunction().getFlatSelectFunction().flatSelect(match, collector);
}
}
@Override
- protected void processTimeoutedSequence(
+ protected void processTimedOutSequences(
Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) {
sideOutputCollector.setAbsoluteTimestamp(timestamp);
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
index d687c67..acd3cd3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
@@ -48,8 +48,8 @@ public class SelectCepOperator<IN, KEY, OUT>
}
@Override
- protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception {
- for (Map<String, List<IN>> match : matchesSequence) {
+ protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
+ for (Map<String, List<IN>> match : matchingSequences) {
output.collect(new StreamRecord<>(getUserFunction().select(match), timestamp));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
index b46b801..d03e25c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
@@ -34,18 +34,18 @@ import java.util.Map;
/**
* Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully
- * matched event patterns and {@link PatternTimeoutFunction} to timeouted ones. The timeouted elements are returned
+ * matched event patterns and {@link PatternTimeoutFunction} to timed out ones. The timed out elements are returned
* as a side-output.
*
* @param <IN> Type of the input elements
* @param <KEY> Type of the key on which the input stream is keyed
* @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timeouted output elements
+ * @param <OUT2> Type of the timed out output elements
*/
public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> {
- private OutputTag<OUT2> timeoutedOutputTag;
+ private OutputTag<OUT2> timedOutOutputTag;
public SelectTimeoutCepOperator(
TypeSerializer<IN> inputSerializer,
@@ -65,21 +65,21 @@ public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
migratingFromOldKeyedOperator,
comparator,
new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction));
- this.timeoutedOutputTag = outputTag;
+ this.timedOutOutputTag = outputTag;
}
@Override
- protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchesSequence, long timestamp) throws Exception {
- for (Map<String, List<IN>> match : matchesSequence) {
+ protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
+ for (Map<String, List<IN>> match : matchingSequences) {
output.collect(new StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), timestamp));
}
}
@Override
- protected void processTimeoutedSequence(
+ protected void processTimedOutSequences(
Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) {
- output.collect(timeoutedOutputTag,
+ output.collect(timedOutOutputTag,
new StreamRecord<>(
getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1),
timestamp));
@@ -91,7 +91,7 @@ public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
*
* @param <IN> Type of the input elements
* @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timeouted output elements
+ * @param <OUT2> Type of the timed out output elements
*/
@Internal
public static class SelectWrapper<IN, OUT1, OUT2> implements Function {
http://git-wip-us.apache.org/repos/asf/flink/blob/b3ffd919/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 14cdd53..a2ac124 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -249,8 +249,8 @@ public class CEPOperatorTest extends TestLogger {
final Map<String, List<Event>> expectedSequence = new HashMap<>(2);
expectedSequence.put("start", Collections.<Event>singletonList(startEvent));
- final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timeouted =
- new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timeouted") {};
+ final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOut =
+ new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timedOut") {};
final KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
new KeyedOneInputStreamOperatorTestHarness<>(
new SelectTimeoutCepOperator<>(
@@ -274,7 +274,7 @@ public class CEPOperatorTest extends TestLogger {
return Tuple2.of(pattern, timeoutTimestamp);
}
},
- timeouted
+ timedOut
), new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
@@ -294,7 +294,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processWatermark(new Watermark(watermarkTimestamp2));
Queue<Object> result = harness.getOutput();
- Queue<StreamRecord<Tuple2<Map<String, List<Event>>, Long>>> sideOutput = harness.getSideOutput(timeouted);
+ Queue<StreamRecord<Tuple2<Map<String, List<Event>>, Long>>> sideOutput = harness.getSideOutput(timedOut);
assertEquals(2L, result.size());
assertEquals(1L, sideOutput.size());