You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:48 UTC
[6/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve
stream alignment via the BarrierBuffer
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cb5e046..ad61c6f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,152 +18,652 @@
package org.apache.flink.streaming.runtime.io;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-public class BarrierBufferTest {
-
- @Test
- public void testWithoutBarriers() throws IOException, InterruptedException {
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBuffer(0));
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
- InputGate mockIG = new MockInputGate(1, input);
- AbstractReader mockAR = new MockReader(mockIG);
+/**
+ * Tests for the behavior of the {@link BarrierBuffer}.
+ */
+public class BarrierBufferTest {
- BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+ private static int SIZE_COUNTER = 0;
+
+ private static IOManager IO_MANAGER;
- assertEquals(input.get(0), bb.getNextNonBlocked());
- assertEquals(input.get(1), bb.getNextNonBlocked());
- assertEquals(input.get(2), bb.getNextNonBlocked());
+ @BeforeClass
+ public static void setup() {
+ IO_MANAGER = new IOManagerAsync();
+ SIZE_COUNTER = 1;
+ }
- bb.cleanup();
+ @AfterClass
+ public static void shutdownIOManager() {
+ IO_MANAGER.shutdown();
}
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * Validates that the buffer behaves correctly if no checkpoint barriers come,
+ * for a single input channel.
+ */
@Test
- public void testOneChannelBarrier() throws IOException, InterruptedException {
+ public void testSingleChannelNoBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(0), createBuffer(0),
+ createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ for (BufferOrEvent boe : sequence) {
+ assertEquals(boe, buffer.getNextNonBlocked());
+ }
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBarrier(1, 0));
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBarrier(2, 0));
- input.add(createBuffer(0));
+ /**
+ * Validates that the buffer behaves correctly if no checkpoint barriers come,
+ * for an input with multiple input channels.
+ */
+ @Test
+ public void testMultiChannelNoBarriers() {
+ try {
+ BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
+ createBuffer(1), createBuffer(0), createEndOfPartition(0),
+ createBuffer(3), createBuffer(1), createEndOfPartition(3),
+ createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
+ };
+
+ MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ for (BufferOrEvent boe : sequence) {
+ assertEquals(boe, buffer.getNextNonBlocked());
+ }
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- InputGate mockIG = new MockInputGate(1, input);
- AbstractReader mockAR = new MockReader(mockIG);
+ /**
+ * Validates that the buffer preserved the order of elements for a
+ * input with a single input channel, and checkpoint events.
+ */
+ @Test
+ public void testSingleChannelWithBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(0), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
+ createBarrier(2, 0), createBarrier(3, 0),
+ createBuffer(0), createBuffer(0),
+ createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, buffer.getNextNonBlocked());
+ }
+ }
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
- BufferOrEvent nextBoe;
+ /**
+ * Validates that the buffer correctly aligns the streams for inputs with
+ * multiple input channels, by buffering and blocking certain inputs.
+ */
+ @Test
+ public void testMultiChannelWithBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+
+ // checkpoint without blocked data
+ createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+ createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+
+ // checkpoint with data only from one channel
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 2),
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 0), createBarrier(3, 1),
+
+ // empty checkpoint
+ createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
+
+ // checkpoint with blocked data in mixed order
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(5, 1),
+ createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1),
+ createBarrier(5, 2),
+ createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1),
+ createBarrier(5, 0),
+
+ // some trailing data
+ createBuffer(0),
+ createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // pre checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+ // blocking while aligning for checkpoint 1
+ check(sequence[7], buffer.getNextNonBlocked());
+ assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+ // checkpoint 1 done, returning buffered data
+ check(sequence[5], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+ check(sequence[6], buffer.getNextNonBlocked());
+
+ // pre checkpoint 2
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+
+ // checkpoint 2 barriers come together
+ check(sequence[17], buffer.getNextNonBlocked());
+ assertEquals(3L, handler.getNextExpectedCheckpointId());
+ check(sequence[18], buffer.getNextNonBlocked());
+
+ // checkpoint 3 starts, data buffered
+ check(sequence[20], buffer.getNextNonBlocked());
+ assertEquals(4L, handler.getNextExpectedCheckpointId());
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // checkpoint 4 happens without extra data
+
+ // pre checkpoint 5
+ check(sequence[27], buffer.getNextNonBlocked());
+ assertEquals(5L, handler.getNextExpectedCheckpointId());
+ check(sequence[28], buffer.getNextNonBlocked());
+ check(sequence[29], buffer.getNextNonBlocked());
+
+ // checkpoint 5 aligning
+ check(sequence[31], buffer.getNextNonBlocked());
+ check(sequence[32], buffer.getNextNonBlocked());
+ check(sequence[33], buffer.getNextNonBlocked());
+ check(sequence[37], buffer.getNextNonBlocked());
+
+ // buffered data from checkpoint 5 alignment
+ check(sequence[34], buffer.getNextNonBlocked());
+ check(sequence[36], buffer.getNextNonBlocked());
+ check(sequence[38], buffer.getNextNonBlocked());
+ check(sequence[39], buffer.getNextNonBlocked());
+
+ // remaining data
+ check(sequence[41], buffer.getNextNonBlocked());
+ check(sequence[42], buffer.getNextNonBlocked());
+ check(sequence[43], buffer.getNextNonBlocked());
+ check(sequence[44], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+ @Test
+ public void testMultiChannelTrailingBlockedData() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
+
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
+ createBarrier(2, 2),
+ createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // pre-checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+ // pre-checkpoint 2
+ check(sequence[6], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+ check(sequence[7], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+
+ // checkpoint 2 alignment
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+
+ // end of stream: remaining buffered contents
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[17], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- bb.cleanup();
+ /**
+ * Validates that the buffer correctly aligns the streams in cases
+ * where some channels receive barriers from multiple successive checkpoints
+ * before the pending checkpoint is complete.
+ */
+ @Test
+ public void testMultiChannelWithQueuedFutureBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 1 - with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(1), createBuffer(0),
+
+ // checkpoint 2 - where future checkpoint barriers come before
+ // the current checkpoint is complete
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(2), createBarrier(2, 0),
+ createBarrier(3, 0), createBuffer(0),
+ createBarrier(3, 1), createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(4, 1), createBuffer(1), createBuffer(2),
+
+ // complete checkpoint 2, send a barrier for checkpoints 4 and 5
+ createBarrier(2, 2),
+ createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+ createBarrier(4, 0),
+ createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+ createBarrier(5, 1),
+
+ // complete checkpoint 3
+ createBarrier(3, 2),
+ createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+ createBarrier(6, 1),
+
+ // complete checkpoint 4, checkpoint 5 remains not fully triggered
+ createBarrier(4, 2),
+ createBuffer(2),
+ createBuffer(1), createEndOfPartition(1),
+ createBuffer(2), createEndOfPartition(2),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // around checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+
+ check(sequence[5], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // alignment of checkpoint 2 - buffering also some barriers for
+ // checkpoints 3 and 4
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[23], buffer.getNextNonBlocked());
+
+ // checkpoint 2 completed
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[25], buffer.getNextNonBlocked());
+ check(sequence[27], buffer.getNextNonBlocked());
+ check(sequence[30], buffer.getNextNonBlocked());
+ check(sequence[32], buffer.getNextNonBlocked());
+
+ // checkpoint 3 completed (emit buffered)
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[28], buffer.getNextNonBlocked());
+
+ // past checkpoint 3
+ check(sequence[36], buffer.getNextNonBlocked());
+ check(sequence[38], buffer.getNextNonBlocked());
+
+ // checkpoint 4 completed (emit buffered)
+ check(sequence[22], buffer.getNextNonBlocked());
+ check(sequence[26], buffer.getNextNonBlocked());
+ check(sequence[31], buffer.getNextNonBlocked());
+ check(sequence[33], buffer.getNextNonBlocked());
+ check(sequence[39], buffer.getNextNonBlocked());
+
+ // past checkpoint 4, alignment for checkpoint 5
+ check(sequence[42], buffer.getNextNonBlocked());
+ check(sequence[45], buffer.getNextNonBlocked());
+ check(sequence[46], buffer.getNextNonBlocked());
+ check(sequence[47], buffer.getNextNonBlocked());
+ check(sequence[48], buffer.getNextNonBlocked());
+
+ // end of input, emit remainder
+ check(sequence[37], buffer.getNextNonBlocked());
+ check(sequence[43], buffer.getNextNonBlocked());
+ check(sequence[44], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
+ /**
+ * Validates that the buffer skips over the current checkpoint if it
+ * receives a barrier from a later checkpoint on a non-blocked input.
+ */
@Test
- public void testMultiChannelBarrier() throws IOException, InterruptedException {
-
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createBarrier(1, 0));
- input.add(createBarrier(2, 0));
- input.add(createBuffer(0));
- input.add(createBarrier(3, 0));
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createBarrier(1, 1));
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createBarrier(2, 1));
- input.add(createBarrier(3, 1));
- input.add(createBarrier(4, 0));
- input.add(createBuffer(0));
- input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
-
+ public void testMultiChannelSkippingCheckpoints() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 1 - with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(1), createBuffer(0),
+
+ // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(2),
+ createBarrier(2, 0),
+ createBuffer(2), createBuffer(0),
+ createBarrier(3, 2),
+
+ createBuffer(2),
+ createBuffer(1), createEndOfPartition(1),
+ createBuffer(2), createEndOfPartition(2),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+ assertEquals(1L, buffer.getCurrentCheckpointId());
+
+ check(sequence[5], buffer.getNextNonBlocked());
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // alignment of checkpoint 2
+ check(sequence[13], buffer.getNextNonBlocked());
+ assertEquals(2L, buffer.getCurrentCheckpointId());
+ check(sequence[15], buffer.getNextNonBlocked());
+
+ // checkpoint 2 aborted, checkpoint 3 started
+ check(sequence[12], buffer.getNextNonBlocked());
+ assertEquals(3L, buffer.getCurrentCheckpointId());
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[23], buffer.getNextNonBlocked());
+ check(sequence[24], buffer.getNextNonBlocked());
+
+ // end of input, emit remainder
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[21], buffer.getNextNonBlocked());
+ check(sequence[22], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- InputGate mockIG1 = new MockInputGate(2, input);
- AbstractReader mockAR1 = new MockReader(mockIG1);
+ /**
+ * Validates that the buffer skips over a later checkpoint if it
+ * receives a barrier from an even later checkpoint on a blocked input.
+ *
+ * NOTE: This test currently fails, because the barrier buffer does not support
+ * to unblock inputs before all previously unblocked data is consumed.
+ *
+ * Since this test checks only that the buffer behaves "failsafe" in cases of
+ * corrupt checkpoint barrier propagation (a situation that does not occur
+ * under the current model), we ignore it for the moment.
+ */
+// @Test
+ public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 1 - with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(1), createBuffer(0),
+
+ // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(2),
+ createBarrier(2, 0),
+ createBuffer(1), createBuffer(0),
+
+ createBarrier(4, 1), // pre-mature barrier on blocked input
+ createBarrier(3, 0), // queued barrier, ignored on replay
+
+ // complete checkpoint 2
+ createBarrier(2, 0),
+ createBuffer(0),
+
+ createBarrier(3, 2), // should be ignored
+ createBuffer(2),
+ createBarrier(4, 0),
+ createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(4, 1),
+
+ createBuffer(1), createEndOfPartition(1),
+ createBuffer(2), createEndOfPartition(2),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ // checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+ assertEquals(1L, buffer.getCurrentCheckpointId());
+ check(sequence[5], buffer.getNextNonBlocked());
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // alignment of checkpoint 2
+ check(sequence[13], buffer.getNextNonBlocked());
+ assertEquals(2L, buffer.getCurrentCheckpointId());
+
+ // checkpoint 2 completed
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+
+ // checkpoint 3 skipped, alignment for 4 started
+ check(sequence[20], buffer.getNextNonBlocked());
+ assertEquals(4L, buffer.getCurrentCheckpointId());
+ check(sequence[22], buffer.getNextNonBlocked());
+ check(sequence[26], buffer.getNextNonBlocked());
+
+ // checkpoint 4 completed
+ check(sequence[24], buffer.getNextNonBlocked());
+ check(sequence[25], buffer.getNextNonBlocked());
+
+ check(sequence[28], buffer.getNextNonBlocked());
+ check(sequence[29], buffer.getNextNonBlocked());
+ check(sequence[30], buffer.getNextNonBlocked());
+ check(sequence[31], buffer.getNextNonBlocked());
+ check(sequence[32], buffer.getNextNonBlocked());
+ check(sequence[33], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
- BufferOrEvent nextBoe;
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
- check(input.get(0), nextBoe = bb.getNextNonBlocked());
- check(input.get(1), nextBoe = bb.getNextNonBlocked());
- check(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(7), nextBoe = bb.getNextNonBlocked());
- check(input.get(8), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(3), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(10), nextBoe = bb.getNextNonBlocked());
- check(input.get(11), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(4), nextBoe = bb.getNextNonBlocked());
- check(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(12), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(6), nextBoe = bb.getNextNonBlocked());
- check(input.get(9), nextBoe = bb.getNextNonBlocked());
- check(input.get(13), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(14), nextBoe = bb.getNextNonBlocked());
- check(input.get(15), nextBoe = bb.getNextNonBlocked());
+ private static BufferOrEvent createBarrier(long id, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ }
- bb.cleanup();
+ private static BufferOrEvent createBuffer(int channel) {
+ // since we have no access to the contents, we need to use the size as an
+ // identifier to validate correctness here
+ return new BufferOrEvent(
+ new Buffer(new MemorySegment(new byte[SIZE_COUNTER++]), DummyBufferRecycler.INSTANCE), channel);
}
- private static void check(BufferOrEvent expected, BufferOrEvent actual) {
- assertEquals(expected.isBuffer(), actual.isBuffer());
- assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
- if (expected.isEvent()) {
- assertEquals(expected.getEvent(), actual.getEvent());
+ private static BufferOrEvent createEndOfPartition(int channel) {
+ return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, channel);
+ }
+
+ private static void check(BufferOrEvent expected, BufferOrEvent present) {
+ assertNotNull(expected);
+ assertNotNull(present);
+ assertEquals(expected.isBuffer(), present.isBuffer());
+
+ if (expected.isBuffer()) {
+ // since we have no access to the contents, we need to use the size as an
+ // identifier to validate correctness here
+ assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+ }
+ else {
+ assertEquals(expected.getEvent(), present.getEvent());
}
}
+
+ // ------------------------------------------------------------------------
+ // Testing Mocks
+ // ------------------------------------------------------------------------
- protected static class MockInputGate implements InputGate {
+ private static class MockInputGate implements InputGate {
- private int numChannels;
- private Queue<BufferOrEvent> boes;
+ private final int numChannels;
+ private final Queue<BufferOrEvent> boes;
public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
this.numChannels = numChannels;
- this.boes = new LinkedList<BufferOrEvent>(boes);
+ this.boes = new ArrayDeque<BufferOrEvent>(boes);
}
@Override
@@ -176,48 +677,38 @@ public class BarrierBufferTest {
}
@Override
- public void requestPartitions() throws IOException, InterruptedException {
- }
+ public void requestPartitions() {}
@Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- return boes.remove();
+ public BufferOrEvent getNextBufferOrEvent() {
+ return boes.poll();
}
@Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
- }
+ public void sendTaskEvent(TaskEvent event) {}
@Override
- public void registerListener(EventListener<InputGate> listener) {
- }
-
+ public void registerListener(EventListener<InputGate> listener) {}
}
- protected static class MockReader extends AbstractReader {
+ private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+
+ private long nextExpectedCheckpointId = -1L;
- protected MockReader(InputGate inputGate) {
- super(inputGate);
+ public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
+ this.nextExpectedCheckpointId = nextExpectedCheckpointId;
}
- @Override
- public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
+ public long getNextExpectedCheckpointId() {
+ return nextExpectedCheckpointId;
}
- }
-
- protected static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
- }
- protected static BufferOrEvent createBuffer(int channel) {
- return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
- new BufferRecycler() {
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- }
- }), channel);
+ @Override
+ public void onEvent(CheckpointBarrier barrier) {
+ assertNotNull(barrier);
+ assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
+ assertTrue(barrier.getTimestamp() > 0);
+ nextExpectedCheckpointId++;
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
index 23ca86d..3f815ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
@@ -25,10 +25,10 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
* A BufferRecycler that does nothing.
*/
public class DummyBufferRecycler implements BufferRecycler {
-
+
public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
-
-
+
+
@Override
public void recycle(MemorySegment memorySegment) {}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
index 9934bd9..b6cd656 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -22,18 +22,36 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
public class SpillingBufferOrEventTest {
+
+ private static IOManager IO_MANAGER;
+
+ @BeforeClass
+ public static void createIOManager() {
+ IO_MANAGER = new IOManagerAsync();
+ }
+
+ @AfterClass
+ public static void shutdownIOManager() {
+ IO_MANAGER.shutdown();
+ }
+ // ------------------------------------------------------------------------
+
@Test
public void testSpilling() throws IOException, InterruptedException {
- BufferSpiller bsp = new BufferSpiller();
+ BufferSpiller bsp = new BufferSpiller(IO_MANAGER);
SpillReader spr = new SpillReader();
BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);