You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/10/06 17:12:24 UTC

[2/5] flink git commit: [FLINK-2283] [streaming] Test for checkpointing in internal operators

[FLINK-2283] [streaming] Test for checkpointing in internal operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76d9638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76d9638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76d9638

Branch: refs/heads/master
Commit: a76d9638e83947d491e6ae1edb82f020b6ba8f54
Parents: 225704b
Author: mbalassi <mb...@apache.org>
Authored: Sun Sep 20 22:27:11 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Oct 6 14:46:06 2015 +0200

----------------------------------------------------------------------
 .../api/operators/StreamGroupedFold.java        |   1 -
 .../api/operators/StreamGroupedReduce.java      |   1 -
 .../UdfStreamOperatorCheckpointingITCase.java   | 267 +++++++++++++++++++
 3 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index f8f167a..ad40b85 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -23,7 +23,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index e1f9f06..68ebd8f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;

http://git-wip-us.apache.org/repos/asf/flink/blob/a76d9638/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
new file mode 100644
index 0000000..263bef1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.GroupedDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Integration test ensuring that the persistent state defined by the implementations
+ * of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
+ * a failure.
+ *
+ * <p>
+ * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
+ * operator.
+ */
+@SuppressWarnings("serial")
+public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
+
+	final private static long NUM_INPUT = 2_500_000L;
+	final private static int NUM_OUTPUT = 1_000;
+
+	/**
+	 * Assembles a stream of a grouping field and some long data. Applies reduce functions
+	 * on this stream.
+	 */
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+
+		// base stream
+		GroupedDataStream<Tuple2<Integer, Long>> stream = env.addSource(new StatefulMultipleSequence())
+				.groupBy(0);
+
+
+		stream
+				// testing built-in aggregate
+				.min(1)
+				// failure generation
+				.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
+				.groupBy(0)
+				.addSink(new MinEvictingQueueSink());
+
+		stream
+				// testing UDF reducer
+				.reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
+					@Override
+					public Tuple2<Integer, Long> reduce(
+							Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.groupBy(0)
+				.addSink(new SumEvictingQueueSink());
+
+		stream
+				// testing UDF folder
+				.fold(Tuple2.of(0, 0L), new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>() {
+					@Override
+					public Tuple2<Integer, Long> fold(
+							Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
+						return Tuple2.of(value.f0, accumulator.f1 + value.f1);
+					}
+				})
+				.groupBy(0)
+				.addSink(new FoldEvictingQueueSink());
+	}
+
+	@Override
+	public void postSubmit() {
+
+		// Note that these checks depend on the ordering of the input
+
+		// Checking the result of the built-in aggregate
+		for (int i = 0; i < PARALLELISM; i++) {
+			for (Long value : MinEvictingQueueSink.queues[i]) {
+				Assert.assertTrue("Value different from 1 found, was " + value + ".", value == 1);
+			}
+		}
+
+		// Checking the result of the UDF reducer
+		for (int i = 0; i < PARALLELISM; i++) {
+			long prevCount = NUM_INPUT - NUM_OUTPUT;
+			long sum = prevCount * (prevCount + 1) / 2;
+			while (!SumEvictingQueueSink.queues[i].isEmpty()) {
+				sum += ++prevCount;
+				Long value = SumEvictingQueueSink.queues[i].remove();
+				Assert.assertTrue("Unexpected reduce value " + value + " instead of " + sum + ".", value == sum);
+			}
+		}
+
+		// Checking the result of the UDF folder
+		for (int i = 0; i < PARALLELISM; i++) {
+			long prevCount = NUM_INPUT - NUM_OUTPUT;
+			long sum = prevCount * (prevCount + 1) / 2;
+			while (!FoldEvictingQueueSink.queues[i].isEmpty()) {
+				sum += ++prevCount;
+				Long value = FoldEvictingQueueSink.queues[i].remove();
+				Assert.assertTrue("Unexpected fold value " + value + " instead of " + sum + ".", value == sum);
+			}
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Produces a sequence multiple times for each parallelism instance of downstream operators,
+	 * augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
+	 */
+	private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>{
+
+		private transient OperatorState<Long> count;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
+			Object lock = ctx.getCheckpointLock();
+
+			while (count.value() < NUM_INPUT){
+				synchronized (lock){
+					for (int i = 0; i < PARALLELISM; i++) {
+						ctx.collect(Tuple2.of(i, count.value() + 1));
+					}
+					count.update(count.value() + 1);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+		}
+	}
+
+	/**
+	 * Mapper that causes one failure between seeing 40% to 70% of the records.
+	 */
+	private static class OnceFailingIdentityMapFunction
+			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private OperatorState<Long> count;
+
+		public OnceFailingIdentityMapFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		}
+
+		@Override
+		public Tuple2<Integer, Long> map(Tuple2<Integer, Long> value) throws Exception {
+			if (!hasFailed && count.value() >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+			count.update(count.value() + 1);
+			return value;
+		}
+
+	}
+
+	/**
+	 * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+	 * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+	 * parallel access of the queues.
+	 */
+	private static class MinEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+		public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			if (queues[value.f0] == null) {
+				queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+			}
+			queues[value.f0].add(value.f1);
+		}
+	}
+
+	/**
+	 * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+	 * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+	 * parallel access of the queues.
+	 */
+	private static class SumEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+		public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			if (queues[value.f0] == null) {
+				queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+			}
+			queues[value.f0].add(value.f1);
+		}
+	}
+
+	/**
+	 * Sink that emits the output to an evicting queue storing the last {@link #NUM_OUTPUT} elements.
+	 * A separate queue is initiated for each group, apply a grouping prior to this operator to avoid
+	 * parallel access of the queues.
+	 */
+	private static class FoldEvictingQueueSink implements SinkFunction<Tuple2<Integer, Long>> {
+
+		public static Queue<Long>[] queues = new Queue[PARALLELISM];
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			if (queues[value.f0] == null) {
+				queues[value.f0] = EvictingQueue.create(NUM_OUTPUT);
+			}
+			queues[value.f0].add(value.f1);
+		}
+	}
+}