You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/10 13:21:16 UTC

[1/2] flink git commit: [FLINK-5033] [cep] Advance time with incoming watermarks at CEP operator

Repository: flink
Updated Branches:
  refs/heads/master 616c4f5e4 -> 6516938b9


[FLINK-5033] [cep] Advance time with incoming watermarks at CEP operator

Before the time was only advanced if the CEP had some events buffered. If the priority queue
was empty, then an incoming watermark did not advance the time. This led to missing timeouts
and pruning possibilities. The PR fixes this problem.

This closes #2771.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/029fda24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/029fda24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/029fda24

Branch: refs/heads/master
Commit: 029fda24583c6655cd97d02773b5ca628cf9e8f1
Parents: 616c4f5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 8 15:14:36 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 10 14:18:01 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  10 +-
 .../AbstractCEPBasePatternOperator.java         |   9 ++
 .../operator/AbstractCEPPatternOperator.java    |  12 +-
 .../AbstractKeyedCEPPatternOperator.java        |  11 +-
 .../flink/cep/operator/CEPPatternOperator.java  |  22 ++-
 .../cep/operator/KeyedCEPPatternOperator.java   |  22 ++-
 .../cep/operator/TimeoutCEPPatternOperator.java |  35 ++--
 .../TimeoutKeyedCEPPatternOperator.java         |  39 +++--
 .../flink/cep/operator/CEPOperatorTest.java     | 161 ++++++++++++++++++-
 .../util/AbstractStreamOperatorTestHarness.java |  20 ++-
 10 files changed, 296 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5ac638e..aefddb2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -123,7 +123,7 @@ public class NFA<T> implements Serializable {
 	 * resulting event sequences are returned. If computations time out and timeout handling is
 	 * activated, then the timed out event patterns are returned.
 	 *
-	 * @param event The current event to be processed
+	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling is
@@ -141,7 +141,7 @@ public class NFA<T> implements Serializable {
 			final Collection<ComputationState<T>> newComputationStates;
 
 			if (!computationState.isStartState() &&
-				windowTime > 0 &&
+				windowTime > 0L &&
 				timestamp - computationState.getStartTimestamp() >= windowTime) {
 
 				if (handleTimeout) {
@@ -158,8 +158,10 @@ public class NFA<T> implements Serializable {
 				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
-			} else {
+			} else if (event != null) {
 				newComputationStates = computeNextStates(computationState, event, timestamp);
+			} else {
+				newComputationStates = Collections.singleton(computationState);
 			}
 
 			for (ComputationState<T> newComputationState: newComputationStates) {
@@ -179,7 +181,7 @@ public class NFA<T> implements Serializable {
 		}
 
 		// prune shared buffer based on window length
-		if(windowTime > 0) {
+		if(windowTime > 0L) {
 			long pruningTimestamp = timestamp - windowTime;
 
 			// sanity check to guard against underflows

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index 2f21346..a3497a6 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -96,4 +96,13 @@ public abstract class AbstractCEPBasePatternOperator<IN, OUT>
 	 * @param timestamp The timestamp of the event
 	 */
 	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+
+	/**
+	 * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
+	 * timeouts.
+	 *
+	 * @param nfa to advance the time for
+	 * @param timestamp to advance the time to
+	 */
+	protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 1c494ef..fe9aced 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -97,10 +97,14 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 		// we do our own watermark handling, no super call. we will never be able to use
 		// the timer service like this, however.
 
-		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-			StreamRecord<IN> streamRecord = priorityQueue.poll();
-
-			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+		if (priorityQueue.isEmpty()) {
+			advanceTime(nfa, mark.getTimestamp());
+		} else {
+			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+				StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+			}
 		}
 
 		output.emitWatermark(mark);

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7541d8f..b5601ef 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -176,11 +176,16 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
 			NFA<IN> nfa = getNFA();
 
-			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-				StreamRecord<IN> streamRecord = priorityQueue.poll();
+			if (priorityQueue.isEmpty()) {
+					advanceTime(nfa, mark.getTimestamp());
+			} else {
+				while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+					StreamRecord<IN> streamRecord = priorityQueue.poll();
 
-				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+					processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+				}
 			}
+
 			updateNFA(nfa);
 			updatePriorityQueue(priorityQueue);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
index 561697d..57f27c2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -48,15 +49,28 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Map<S
 
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		if (!matchedPatterns.isEmpty()) {
+		emitMatchedSequences(matchedPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+	}
+
+	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
+
+		if (iterator.hasNext()) {
 			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
 				null,
 				timestamp);
 
-			for (Map<String, IN> pattern: matchedPatterns) {
-				streamRecord.replace(pattern);
+			do {
+				streamRecord.replace(iterator.next());
 				output.collect(streamRecord);
-			}
+			} while (iterator.hasNext());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 62d82d9..4d8a907 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -51,15 +52,28 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		if (!matchedPatterns.isEmpty()) {
+		emitMatchedSequences(matchedPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+	}
+
+	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
+
+		if (iterator.hasNext()) {
 			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
 				null,
 				timestamp);
 
-			for (Map<String, IN> pattern: matchedPatterns) {
-				streamRecord.replace(pattern);
+			do {
+				streamRecord.replace(iterator.next());
 				output.collect(streamRecord);
-			}
+			} while (iterator.hasNext());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
index 9b0c951..9a04468 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
@@ -58,22 +58,37 @@ public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
 
+		emitMatchedSequences(matchedPatterns, timestamp);
+		emitTimedOutSequences(partialPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
+	}
+
+	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
 		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
 			null,
 			timestamp);
 
-		if (!matchedPatterns.isEmpty()) {
-			for (Map<String, IN> matchedPattern : matchedPatterns) {
-				streamRecord.replace(Either.Right(matchedPattern));
-				output.collect(streamRecord);
-			}
+		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
+			streamRecord.replace(Either.Left(partialPattern));
+			output.collect(streamRecord);
 		}
+	}
+
+	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
+			null,
+			timestamp);
 
-		if (!partialPatterns.isEmpty()) {
-			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
-				streamRecord.replace(Either.Left(partialPattern));
-				output.collect(streamRecord);
-			}
+		for (Map<String, IN> matchedPattern : matchedSequences) {
+			streamRecord.replace(Either.Right(matchedPattern));
+			output.collect(streamRecord);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 23b1a91..4d33435 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -48,25 +48,40 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 			event,
 			timestamp);
 
-		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
+		Collection<Map<String, IN>> matchedSequences = patterns.f0;
+		Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = patterns.f1;
 
+		emitMatchedSequences(matchedSequences, timestamp);
+		emitTimedOutSequences(timedOutSequences, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
+	}
+
+	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
 		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
 			null,
 			timestamp);
 
-		if (!matchedPatterns.isEmpty()) {
-			for (Map<String, IN> matchedPattern : matchedPatterns) {
-				streamRecord.replace(Either.Right(matchedPattern));
-				output.collect(streamRecord);
-			}
+		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
+			streamRecord.replace(Either.Left(partialPattern));
+			output.collect(streamRecord);
 		}
+	}
+
+	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
+			null,
+			timestamp);
 
-		if (!partialPatterns.isEmpty()) {
-			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
-				streamRecord.replace(Either.Left(partialPattern));
-				output.collect(streamRecord);
-			}
+		for (Map<String, IN> matchedPattern : matchedSequences) {
+			streamRecord.replace(Either.Right(matchedPattern));
+			output.collect(streamRecord);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 6cffd9c..db17f6d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
@@ -35,6 +38,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.*;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class CEPOperatorTest extends TestLogger {
@@ -411,10 +417,161 @@ public class CEPOperatorTest extends TestLogger {
 		harness.close();
 	}
 
+	/**
+	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
+	 */
+	@Test
+	public void testKeyedAdvancingTimeWithoutElements() throws Exception {
+		final KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+		final Event startEvent = new Event(42, "start", 1.0);
+		final long watermarkTimestamp1 = 5L;
+		final long watermarkTimestamp2 = 13L;
+
+		final Map<String, Event> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", startEvent);
+
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			new TimeoutKeyedCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				keySelector,
+				IntSerializer.INSTANCE,
+				new NFAFactory(true)),
+			keySelector,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		try {
+			harness.setup(
+				new KryoSerializer<>(
+					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
+					new ExecutionConfig()));
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(startEvent, 3L));
+			harness.processWatermark(new Watermark(watermarkTimestamp1));
+			harness.processWatermark(new Watermark(watermarkTimestamp2));
+
+			Queue<Object> result = harness.getOutput();
+
+			assertEquals(3, result.size());
+
+			Object watermark1 = result.poll();
+
+			assertTrue(watermark1 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
+
+			Object resultObject = result.poll();
+
+			assertTrue(resultObject instanceof StreamRecord);
+
+			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
+
+			assertTrue(streamRecord.getValue() instanceof Either.Left);
+
+			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
+
+			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+
+			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
+			assertEquals(expectedSequence, leftResult.f0);
+
+			Object watermark2 = result.poll();
+
+			assertTrue(watermark2 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
+		} finally {
+			harness.close();
+		}
+	}
+
+	/**
+	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
+	 */
+	@Test
+	public void testAdvancingTimeWithoutElements() throws Exception {
+		final Event startEvent = new Event(42, "start", 1.0);
+		final long watermarkTimestamp1 = 5L;
+		final long watermarkTimestamp2 = 13L;
+
+		final Map<String, Event> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", startEvent);
+
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>(
+			new TimeoutCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				new NFAFactory(true))
+		);
+
+		try {
+			harness.setup(
+				new KryoSerializer<>(
+					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
+					new ExecutionConfig()));
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(startEvent, 3L));
+			harness.processWatermark(new Watermark(watermarkTimestamp1));
+			harness.processWatermark(new Watermark(watermarkTimestamp2));
+
+			Queue<Object> result = harness.getOutput();
+
+			assertEquals(3, result.size());
+
+			Object watermark1 = result.poll();
+
+			assertTrue(watermark1 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
+
+			Object resultObject = result.poll();
+
+			assertTrue(resultObject instanceof StreamRecord);
+
+			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
+
+			assertTrue(streamRecord.getValue() instanceof Either.Left);
+
+			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
+
+			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+
+			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
+			assertEquals(expectedSequence, leftResult.f0);
+
+			Object watermark2 = result.poll();
+
+			assertTrue(watermark2 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
+		} finally {
+			harness.close();
+		}
+	}
+
 	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
 
 		private static final long serialVersionUID = 1173020762472766713L;
 
+		private final boolean handleTimeout;
+
+		private NFAFactory() {
+			this(false);
+		}
+
+		private NFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
 		@Override
 		public NFA<Event> createNFA() {
 
@@ -444,9 +601,9 @@ public class CEPOperatorTest extends TestLogger {
 					})
 					// add a window timeout to test whether timestamps of elements in the
 					// priority queue in CEP operator are correctly checkpointed/restored
-					.within(Time.milliseconds(10));
+					.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index c923b17..23a31d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -232,8 +232,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	 * Calls
 	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
 	 */
-	public void setup() throws Exception {
-		operator.setup(mockTask, config, new MockOutput());
+	public void setup() {
+		setup(null);
+	}
+
+	/**
+	 * Calls
+	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+	 */
+	public void setup(TypeSerializer<OUT> outputSerializer) {
+		operator.setup(mockTask, config, new MockOutput(outputSerializer));
 		setupCalled = true;
 	}
 
@@ -493,6 +501,14 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 		private TypeSerializer<OUT> outputSerializer;
 
+		MockOutput() {
+			this(null);
+		}
+
+		MockOutput(TypeSerializer<OUT> outputSerializer) {
+			this.outputSerializer = outputSerializer;
+		}
+
 		@Override
 		public void emitWatermark(Watermark mark) {
 			outputList.add(mark);


[2/2] flink git commit: [FLINK-5027] FileSource finishes successfully with a wrong path

Posted by tr...@apache.org.
[FLINK-5027] FileSource finishes successfully with a wrong path

Integrated comments

This closes #2765.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6516938b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6516938b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6516938b

Branch: refs/heads/master
Commit: 6516938b93c08882587d32e8c39e7ebd9ed2fc7b
Parents: 029fda2
Author: kl0u <kk...@gmail.com>
Authored: Mon Nov 7 16:35:38 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 10 14:20:06 2016 +0100

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileProcessingTest.java | 27 ++++++++++++++++++++
 .../ContinuousFileMonitoringFunction.java       |  4 +++
 .../source/ContinuousFileReaderOperator.java    |  2 +-
 3 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 0283f68..5b14251 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -46,6 +46,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -110,6 +111,32 @@ public class ContinuousFileProcessingTest {
 	//						TESTS
 
 	@Test
+	public void testInvalidPathSpecification() throws Exception {
+
+		String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/";
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, invalidPath,
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+		try {
+			monitoringFunction.run(new DummySourceContext() {
+				@Override
+				public void collect(TimestampedFileInputSplit element) {
+					// we should never arrive here with an invalid path
+					Assert.fail("Test passes with an invalid path.");
+				}
+			});
+
+			// we should never arrive here with an invalid path
+			Assert.fail("Test passed with an invalid path.");
+
+		} catch (FileNotFoundException e) {
+			Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage());
+		}
+	}
+
+	@Test
 	public void testFileReadingOperatorWithIngestionTime() throws Exception {
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Map<Integer, String> expectedFileContents = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index a6c5e49..10068a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -124,6 +125,9 @@ public class ContinuousFileMonitoringFunction<OUT>
 	@Override
 	public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
 		FileSystem fileSystem = FileSystem.get(new URI(path));
+		if (!fileSystem.exists(new Path(path))) {
+			throw new FileNotFoundException("The provided file path " + path + " does not exist.");
+		}
 
 		checkpointLock = context.getCheckpointLock();
 		switch (watchType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c8e9846..19e4737 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -311,7 +311,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 
 			} finally {
 				synchronized (checkpointLock) {
-					LOG.info("Reader terminated, and exiting...");
+					LOG.debug("Reader terminated, and exiting...");
 
 					try {
 						this.format.closeInputFormat();