You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:40 UTC
[24/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
deleted file mode 100644
index d4fdc59..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ /dev/null
@@ -1,954 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the behavior of the {@link BarrierBuffer}.
- */
-public class BarrierBufferTest {
-
- private static final int PAGE_SIZE = 512;
-
- private static int SIZE_COUNTER = 0;
-
- private static IOManager IO_MANAGER;
-
- @BeforeClass
- public static void setup() {
- IO_MANAGER = new IOManagerAsync();
- SIZE_COUNTER = 1;
- }
-
- @AfterClass
- public static void shutdownIOManager() {
- IO_MANAGER.shutdown();
- }
-
- // ------------------------------------------------------------------------
- // Tests
- // ------------------------------------------------------------------------
-
- /**
- * Validates that the buffer behaves correctly if no checkpoint barriers come,
- * for a single input channel.
- */
- @Test
- public void testSingleChannelNoBarriers() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(0), createBuffer(0),
- createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- for (BufferOrEvent boe : sequence) {
- assertEquals(boe, buffer.getNextNonBlocked());
- }
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer behaves correctly if no checkpoint barriers come,
- * for an input with multiple input channels.
- */
- @Test
- public void testMultiChannelNoBarriers() {
- try {
- BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
- createBuffer(1), createBuffer(0), createEndOfPartition(0),
- createBuffer(3), createBuffer(1), createEndOfPartition(3),
- createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- for (BufferOrEvent boe : sequence) {
- assertEquals(boe, buffer.getNextNonBlocked());
- }
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer preserved the order of elements for a
- * input with a single input channel, and checkpoint events.
- */
- @Test
- public void testSingleChannelWithBarriers() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(0), createBuffer(0),
- createBarrier(1, 0),
- createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
- createBarrier(2, 0), createBarrier(3, 0),
- createBuffer(0), createBuffer(0),
- createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
- createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- for (BufferOrEvent boe : sequence) {
- if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
- assertEquals(boe, buffer.getNextNonBlocked());
- }
- }
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer correctly aligns the streams for inputs with
- * multiple input channels, by buffering and blocking certain inputs.
- */
- @Test
- public void testMultiChannelWithBarriers() {
- try {
- BufferOrEvent[] sequence = {
- // checkpoint with blocked data
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(1, 0),
-
- // checkpoint without blocked data
- createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
- createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
-
- // checkpoint with data only from one channel
- createBuffer(2), createBuffer(2),
- createBarrier(3, 2),
- createBuffer(2), createBuffer(2),
- createBarrier(3, 0), createBarrier(3, 1),
-
- // empty checkpoint
- createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
-
- // checkpoint with blocked data in mixed order
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(5, 1),
- createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1),
- createBarrier(5, 2),
- createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1),
- createBarrier(5, 0),
-
- // some trailing data
- createBuffer(0),
- createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- // pre checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- assertEquals(1L, handler.getNextExpectedCheckpointId());
-
- // blocking while aligning for checkpoint 1
- check(sequence[7], buffer.getNextNonBlocked());
- assertEquals(1L, handler.getNextExpectedCheckpointId());
-
- // checkpoint 1 done, returning buffered data
- check(sequence[5], buffer.getNextNonBlocked());
- assertEquals(2L, handler.getNextExpectedCheckpointId());
- check(sequence[6], buffer.getNextNonBlocked());
-
- // pre checkpoint 2
- check(sequence[9], buffer.getNextNonBlocked());
- check(sequence[10], buffer.getNextNonBlocked());
- check(sequence[11], buffer.getNextNonBlocked());
- check(sequence[12], buffer.getNextNonBlocked());
- check(sequence[13], buffer.getNextNonBlocked());
- assertEquals(2L, handler.getNextExpectedCheckpointId());
-
- // checkpoint 2 barriers come together
- check(sequence[17], buffer.getNextNonBlocked());
- assertEquals(3L, handler.getNextExpectedCheckpointId());
- check(sequence[18], buffer.getNextNonBlocked());
-
- // checkpoint 3 starts, data buffered
- check(sequence[20], buffer.getNextNonBlocked());
- assertEquals(4L, handler.getNextExpectedCheckpointId());
- check(sequence[21], buffer.getNextNonBlocked());
-
- // checkpoint 4 happens without extra data
-
- // pre checkpoint 5
- check(sequence[27], buffer.getNextNonBlocked());
- assertEquals(5L, handler.getNextExpectedCheckpointId());
- check(sequence[28], buffer.getNextNonBlocked());
- check(sequence[29], buffer.getNextNonBlocked());
-
- // checkpoint 5 aligning
- check(sequence[31], buffer.getNextNonBlocked());
- check(sequence[32], buffer.getNextNonBlocked());
- check(sequence[33], buffer.getNextNonBlocked());
- check(sequence[37], buffer.getNextNonBlocked());
-
- // buffered data from checkpoint 5 alignment
- check(sequence[34], buffer.getNextNonBlocked());
- check(sequence[36], buffer.getNextNonBlocked());
- check(sequence[38], buffer.getNextNonBlocked());
- check(sequence[39], buffer.getNextNonBlocked());
-
- // remaining data
- check(sequence[41], buffer.getNextNonBlocked());
- check(sequence[42], buffer.getNextNonBlocked());
- check(sequence[43], buffer.getNextNonBlocked());
- check(sequence[44], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultiChannelTrailingBlockedData() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(1), createBuffer(2),
- createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(2, 1),
- createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
- createBarrier(2, 2),
- createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- // pre-checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- assertEquals(1L, handler.getNextExpectedCheckpointId());
-
- // pre-checkpoint 2
- check(sequence[6], buffer.getNextNonBlocked());
- assertEquals(2L, handler.getNextExpectedCheckpointId());
- check(sequence[7], buffer.getNextNonBlocked());
- check(sequence[8], buffer.getNextNonBlocked());
-
- // checkpoint 2 alignment
- check(sequence[13], buffer.getNextNonBlocked());
- check(sequence[14], buffer.getNextNonBlocked());
- check(sequence[18], buffer.getNextNonBlocked());
- check(sequence[19], buffer.getNextNonBlocked());
-
- // end of stream: remaining buffered contents
- check(sequence[10], buffer.getNextNonBlocked());
- check(sequence[11], buffer.getNextNonBlocked());
- check(sequence[12], buffer.getNextNonBlocked());
- check(sequence[16], buffer.getNextNonBlocked());
- check(sequence[17], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer correctly aligns the streams in cases
- * where some channels receive barriers from multiple successive checkpoints
- * before the pending checkpoint is complete.
- */
- @Test
- public void testMultiChannelWithQueuedFutureBarriers() {
- try {
- BufferOrEvent[] sequence = {
- // checkpoint 1 - with blocked data
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(1, 0),
- createBuffer(1), createBuffer(0),
-
- // checkpoint 2 - where future checkpoint barriers come before
- // the current checkpoint is complete
- createBarrier(2, 1),
- createBuffer(1), createBuffer(2), createBarrier(2, 0),
- createBarrier(3, 0), createBuffer(0),
- createBarrier(3, 1), createBuffer(0), createBuffer(1), createBuffer(2),
- createBarrier(4, 1), createBuffer(1), createBuffer(2),
-
- // complete checkpoint 2, send a barrier for checkpoints 4 and 5
- createBarrier(2, 2),
- createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
- createBarrier(4, 0),
- createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
- createBarrier(5, 1),
-
- // complete checkpoint 3
- createBarrier(3, 2),
- createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
- createBarrier(6, 1),
-
- // complete checkpoint 4, checkpoint 5 remains not fully triggered
- createBarrier(4, 2),
- createBuffer(2),
- createBuffer(1), createEndOfPartition(1),
- createBuffer(2), createEndOfPartition(2),
- createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- // around checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- check(sequence[7], buffer.getNextNonBlocked());
-
- check(sequence[5], buffer.getNextNonBlocked());
- assertEquals(2L, handler.getNextExpectedCheckpointId());
- check(sequence[6], buffer.getNextNonBlocked());
- check(sequence[9], buffer.getNextNonBlocked());
- check(sequence[10], buffer.getNextNonBlocked());
-
- // alignment of checkpoint 2 - buffering also some barriers for
- // checkpoints 3 and 4
- check(sequence[13], buffer.getNextNonBlocked());
- check(sequence[20], buffer.getNextNonBlocked());
- check(sequence[23], buffer.getNextNonBlocked());
-
- // checkpoint 2 completed
- check(sequence[12], buffer.getNextNonBlocked());
- check(sequence[25], buffer.getNextNonBlocked());
- check(sequence[27], buffer.getNextNonBlocked());
- check(sequence[30], buffer.getNextNonBlocked());
- check(sequence[32], buffer.getNextNonBlocked());
-
- // checkpoint 3 completed (emit buffered)
- check(sequence[16], buffer.getNextNonBlocked());
- check(sequence[18], buffer.getNextNonBlocked());
- check(sequence[19], buffer.getNextNonBlocked());
- check(sequence[28], buffer.getNextNonBlocked());
-
- // past checkpoint 3
- check(sequence[36], buffer.getNextNonBlocked());
- check(sequence[38], buffer.getNextNonBlocked());
-
- // checkpoint 4 completed (emit buffered)
- check(sequence[22], buffer.getNextNonBlocked());
- check(sequence[26], buffer.getNextNonBlocked());
- check(sequence[31], buffer.getNextNonBlocked());
- check(sequence[33], buffer.getNextNonBlocked());
- check(sequence[39], buffer.getNextNonBlocked());
-
- // past checkpoint 4, alignment for checkpoint 5
- check(sequence[42], buffer.getNextNonBlocked());
- check(sequence[45], buffer.getNextNonBlocked());
- check(sequence[46], buffer.getNextNonBlocked());
-
- // abort checkpoint 5 (end of partition)
- check(sequence[37], buffer.getNextNonBlocked());
-
- // start checkpoint 6 alignment
- check(sequence[47], buffer.getNextNonBlocked());
- check(sequence[48], buffer.getNextNonBlocked());
-
- // end of input, emit remainder
- check(sequence[43], buffer.getNextNonBlocked());
- check(sequence[44], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer skips over the current checkpoint if it
- * receives a barrier from a later checkpoint on a non-blocked input.
- */
- @Test
- public void testMultiChannelSkippingCheckpoints() {
- try {
- BufferOrEvent[] sequence = {
- // checkpoint 1 - with blocked data
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(1, 0),
- createBuffer(1), createBuffer(0),
-
- // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
- createBarrier(2, 1),
- createBuffer(1), createBuffer(2),
- createBarrier(2, 0),
- createBuffer(2), createBuffer(0),
- createBarrier(3, 2),
-
- createBuffer(2),
- createBuffer(1), createEndOfPartition(1),
- createBuffer(2), createEndOfPartition(2),
- createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- // checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- check(sequence[7], buffer.getNextNonBlocked());
- assertEquals(1L, buffer.getCurrentCheckpointId());
-
- check(sequence[5], buffer.getNextNonBlocked());
- check(sequence[6], buffer.getNextNonBlocked());
- check(sequence[9], buffer.getNextNonBlocked());
- check(sequence[10], buffer.getNextNonBlocked());
-
- // alignment of checkpoint 2
- check(sequence[13], buffer.getNextNonBlocked());
- assertEquals(2L, buffer.getCurrentCheckpointId());
- check(sequence[15], buffer.getNextNonBlocked());
-
- // checkpoint 2 aborted, checkpoint 3 started
- check(sequence[12], buffer.getNextNonBlocked());
- assertEquals(3L, buffer.getCurrentCheckpointId());
- check(sequence[16], buffer.getNextNonBlocked());
- check(sequence[19], buffer.getNextNonBlocked());
- check(sequence[20], buffer.getNextNonBlocked());
-
- // checkpoint 3 aborted (end of partition)
- check(sequence[18], buffer.getNextNonBlocked());
- check(sequence[21], buffer.getNextNonBlocked());
- check(sequence[22], buffer.getNextNonBlocked());
- check(sequence[23], buffer.getNextNonBlocked());
- check(sequence[24], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer skips over the current checkpoint if it
- * receives a barrier from a later checkpoint on a non-blocked input.
- */
- @Test
- public void testMultiChannelJumpingOverCheckpoint() {
- try {
- BufferOrEvent[] sequence = {
- // checkpoint 1 - with blocked data
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(1, 0),
- createBuffer(1), createBuffer(0),
-
- // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
- createBarrier(2, 1),
- createBuffer(1), createBuffer(2),
- createBarrier(2, 0),
- createBuffer(2), createBuffer(0),
- createBarrier(3, 1),
- createBuffer(1), createBuffer(2),
- createBarrier(3, 0),
- createBuffer(2), createBuffer(0),
- createBarrier(4, 2),
-
- createBuffer(2),
- createBuffer(1), createEndOfPartition(1),
- createBuffer(2), createEndOfPartition(2),
- createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- // checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- check(sequence[7], buffer.getNextNonBlocked());
- assertEquals(1L, buffer.getCurrentCheckpointId());
-
- check(sequence[5], buffer.getNextNonBlocked());
- check(sequence[6], buffer.getNextNonBlocked());
- check(sequence[9], buffer.getNextNonBlocked());
- check(sequence[10], buffer.getNextNonBlocked());
-
- // alignment of checkpoint 2
- check(sequence[13], buffer.getNextNonBlocked());
- assertEquals(2L, buffer.getCurrentCheckpointId());
- check(sequence[15], buffer.getNextNonBlocked());
- check(sequence[19], buffer.getNextNonBlocked());
- check(sequence[21], buffer.getNextNonBlocked());
-
- // checkpoint 2 aborted, checkpoint 4 started. replay buffered
- check(sequence[12], buffer.getNextNonBlocked());
- assertEquals(4L, buffer.getCurrentCheckpointId());
- check(sequence[16], buffer.getNextNonBlocked());
- check(sequence[18], buffer.getNextNonBlocked());
- check(sequence[22], buffer.getNextNonBlocked());
-
- // align checkpoint 4 remainder
- check(sequence[25], buffer.getNextNonBlocked());
- check(sequence[26], buffer.getNextNonBlocked());
-
- // checkpoint 4 aborted (due to end of partition)
- check(sequence[24], buffer.getNextNonBlocked());
- check(sequence[27], buffer.getNextNonBlocked());
- check(sequence[28], buffer.getNextNonBlocked());
- check(sequence[29], buffer.getNextNonBlocked());
- check(sequence[30], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Validates that the buffer skips over a later checkpoint if it
- * receives a barrier from an even later checkpoint on a blocked input.
- */
- @Test
- public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
- try {
- BufferOrEvent[] sequence = {
- // checkpoint 1 - with blocked data
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(1, 0),
- createBuffer(1), createBuffer(0),
-
- // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
- createBarrier(2, 1),
- createBuffer(1), createBuffer(2),
- createBarrier(2, 0),
- createBuffer(1), createBuffer(0),
-
- createBarrier(3, 0), // queued barrier on blocked input
- createBuffer(0),
-
- createBarrier(4, 1), // pre-mature barrier on blocked input
- createBuffer(1),
- createBuffer(0),
- createBuffer(2),
-
- // complete checkpoint 2
- createBarrier(2, 2),
- createBuffer(0),
-
- createBarrier(3, 2), // should be ignored
- createBuffer(2),
- createBarrier(4, 0),
- createBuffer(0), createBuffer(1), createBuffer(2),
- createBarrier(4, 2),
-
- createBuffer(1), createEndOfPartition(1),
- createBuffer(2), createEndOfPartition(2),
- createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- // checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- check(sequence[7], buffer.getNextNonBlocked());
- assertEquals(1L, buffer.getCurrentCheckpointId());
- check(sequence[5], buffer.getNextNonBlocked());
- check(sequence[6], buffer.getNextNonBlocked());
- check(sequence[9], buffer.getNextNonBlocked());
- check(sequence[10], buffer.getNextNonBlocked());
-
- // alignment of checkpoint 2
- check(sequence[13], buffer.getNextNonBlocked());
- check(sequence[22], buffer.getNextNonBlocked());
- assertEquals(2L, buffer.getCurrentCheckpointId());
-
- // checkpoint 2 completed
- check(sequence[12], buffer.getNextNonBlocked());
- check(sequence[15], buffer.getNextNonBlocked());
- check(sequence[16], buffer.getNextNonBlocked());
-
- // checkpoint 3 skipped, alignment for 4 started
- check(sequence[18], buffer.getNextNonBlocked());
- assertEquals(4L, buffer.getCurrentCheckpointId());
- check(sequence[21], buffer.getNextNonBlocked());
- check(sequence[24], buffer.getNextNonBlocked());
- check(sequence[26], buffer.getNextNonBlocked());
- check(sequence[30], buffer.getNextNonBlocked());
-
- // checkpoint 4 completed
- check(sequence[20], buffer.getNextNonBlocked());
- check(sequence[28], buffer.getNextNonBlocked());
- check(sequence[29], buffer.getNextNonBlocked());
-
- check(sequence[32], buffer.getNextNonBlocked());
- check(sequence[33], buffer.getNextNonBlocked());
- check(sequence[34], buffer.getNextNonBlocked());
- check(sequence[35], buffer.getNextNonBlocked());
- check(sequence[36], buffer.getNextNonBlocked());
- check(sequence[37], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testEarlyCleanup() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(1), createBuffer(2),
- createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-
- createBuffer(2), createBuffer(1), createBuffer(0),
- createBarrier(2, 1),
- createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
- createBarrier(2, 2),
- createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
-
- // pre-checkpoint 1
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- assertEquals(1L, handler.getNextExpectedCheckpointId());
-
- // pre-checkpoint 2
- check(sequence[6], buffer.getNextNonBlocked());
- assertEquals(2L, handler.getNextExpectedCheckpointId());
- check(sequence[7], buffer.getNextNonBlocked());
- check(sequence[8], buffer.getNextNonBlocked());
-
- // checkpoint 2 alignment
- check(sequence[13], buffer.getNextNonBlocked());
- check(sequence[14], buffer.getNextNonBlocked());
- check(sequence[18], buffer.getNextNonBlocked());
- check(sequence[19], buffer.getNextNonBlocked());
-
- // end of stream: remaining buffered contents
- buffer.getNextNonBlocked();
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testStartAlignmentWithClosedChannels() {
- try {
- BufferOrEvent[] sequence = {
- // close some channels immediately
- createEndOfPartition(2), createEndOfPartition(1),
-
- // checkpoint without blocked data
- createBuffer(0), createBuffer(0), createBuffer(3),
- createBarrier(2, 3), createBarrier(2, 0),
-
- // checkpoint with blocked data
- createBuffer(3), createBuffer(0),
- createBarrier(3, 3),
- createBuffer(3), createBuffer(0),
- createBarrier(3, 0),
-
- // empty checkpoint
- createBarrier(4, 0), createBarrier(4, 3),
-
- // some data, one channel closes
- createBuffer(0), createBuffer(0), createBuffer(3),
- createEndOfPartition(0),
-
- // checkpoint on last remaining channel
- createBuffer(3),
- createBarrier(5, 3),
- createBuffer(3),
- createEndOfPartition(3)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
-
- BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
- // pre checkpoint 2
- check(sequence[0], buffer.getNextNonBlocked());
- check(sequence[1], buffer.getNextNonBlocked());
- check(sequence[2], buffer.getNextNonBlocked());
- check(sequence[3], buffer.getNextNonBlocked());
- check(sequence[4], buffer.getNextNonBlocked());
-
- // checkpoint 3 alignment
- check(sequence[7], buffer.getNextNonBlocked());
- assertEquals(2L, buffer.getCurrentCheckpointId());
- check(sequence[8], buffer.getNextNonBlocked());
- check(sequence[11], buffer.getNextNonBlocked());
-
- // checkpoint 3 buffered
- check(sequence[10], buffer.getNextNonBlocked());
- assertEquals(3L, buffer.getCurrentCheckpointId());
-
- // after checkpoint 4
- check(sequence[15], buffer.getNextNonBlocked());
- assertEquals(4L, buffer.getCurrentCheckpointId());
- check(sequence[16], buffer.getNextNonBlocked());
- check(sequence[17], buffer.getNextNonBlocked());
- check(sequence[18], buffer.getNextNonBlocked());
-
- check(sequence[19], buffer.getNextNonBlocked());
- check(sequence[21], buffer.getNextNonBlocked());
- assertEquals(5L, buffer.getCurrentCheckpointId());
- check(sequence[22], buffer.getNextNonBlocked());
-
- assertNull(buffer.getNextNonBlocked());
- assertNull(buffer.getNextNonBlocked());
-
- buffer.cleanup();
-
- checkNoTempFilesRemain();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testEndOfStreamWhileCheckpoint() {
-
- }
-
- // ------------------------------------------------------------------------
- // Utils
- // ------------------------------------------------------------------------
-
- private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
- }
-
- private static BufferOrEvent createBuffer(int channel) {
- // since we have no access to the contents, we need to use the size as an
- // identifier to validate correctness here
- Buffer buf = new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
- FreeingBufferRecycler.INSTANCE);
-
- buf.setSize(SIZE_COUNTER++);
- return new BufferOrEvent(buf, channel);
- }
-
- private static BufferOrEvent createEndOfPartition(int channel) {
- return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, channel);
- }
-
- private static void check(BufferOrEvent expected, BufferOrEvent present) {
- assertNotNull(expected);
- assertNotNull(present);
- assertEquals(expected.isBuffer(), present.isBuffer());
-
- if (expected.isBuffer()) {
- // since we have no access to the contents, we need to use the size as an
- // identifier to validate correctness here
- assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
- }
- else {
- assertEquals(expected.getEvent(), present.getEvent());
- }
- }
-
- private static void checkNoTempFilesRemain() {
- // validate that all temp files have been removed
- for (File dir : IO_MANAGER.getSpillingDirectories()) {
- for (String file : dir.list()) {
- if (file != null && !(file.equals(".") || file.equals(".."))) {
- fail("barrier buffer did not clean up temp files. remaining file: " + file);
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Testing Mocks
- // ------------------------------------------------------------------------
-
- private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
-
- private long nextExpectedCheckpointId = -1L;
-
- public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
- this.nextExpectedCheckpointId = nextExpectedCheckpointId;
- }
-
- public long getNextExpectedCheckpointId() {
- return nextExpectedCheckpointId;
- }
-
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- assertNotNull(barrier);
- assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
- assertTrue(barrier.getTimestamp() > 0);
- nextExpectedCheckpointId++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
deleted file mode 100644
index b9b6e5f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the behavior of the barrier tracker.
- */
-public class BarrierTrackerTest {
-
- private static final int PAGE_SIZE = 512;
-
- @Test
- public void testSingleChannelNoBarriers() {
- try {
- BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), createBuffer(0) };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- for (BufferOrEvent boe : sequence) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultiChannelNoBarriers() {
- try {
- BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
- createBuffer(1), createBuffer(0), createBuffer(3),
- createBuffer(1), createBuffer(1), createBuffer(2)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- for (BufferOrEvent boe : sequence) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSingleChannelWithBarriers() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(0), createBuffer(0),
- createBarrier(1, 0),
- createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
- createBarrier(2, 0), createBarrier(3, 0),
- createBuffer(0), createBuffer(0),
- createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
- createBuffer(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
- tracker.registerCheckpointEventHandler(validator);
-
- for (BufferOrEvent boe : sequence) {
- if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSingleChannelWithSkippedBarriers() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0),
- createBarrier(1, 0),
- createBuffer(0), createBuffer(0),
- createBarrier(3, 0), createBuffer(0),
- createBarrier(4, 0), createBarrier(6, 0), createBuffer(0),
- createBarrier(7, 0), createBuffer(0), createBarrier(10, 0),
- createBuffer(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
- tracker.registerCheckpointEventHandler(validator);
-
- for (BufferOrEvent boe : sequence) {
- if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultiChannelWithBarriers() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1),
- createBarrier(1, 0),
-
- createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
- createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
-
- createBuffer(2), createBuffer(2),
- createBarrier(3, 2),
- createBuffer(2), createBuffer(2),
- createBarrier(3, 0), createBarrier(3, 1),
-
- createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
-
- createBuffer(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, 3, 4);
- tracker.registerCheckpointEventHandler(validator);
-
- for (BufferOrEvent boe : sequence) {
- if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultiChannelSkippingCheckpoints() {
- try {
- BufferOrEvent[] sequence = {
- createBuffer(0), createBuffer(2), createBuffer(0),
- createBarrier(1, 1), createBarrier(1, 2),
- createBuffer(2), createBuffer(1),
- createBarrier(1, 0),
-
- createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
- createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
-
- createBuffer(2), createBuffer(2),
- createBarrier(3, 2),
- createBuffer(2), createBuffer(2),
-
- // jump to checkpoint 4
- createBarrier(4, 0),
- createBuffer(0), createBuffer(1), createBuffer(2),
- createBarrier(4, 1),
- createBuffer(1),
- createBarrier(4, 2),
-
- createBuffer(0)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(1, 2, 4);
- tracker.registerCheckpointEventHandler(validator);
-
- for (BufferOrEvent boe : sequence) {
- if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * This test validates that the barrier tracker does not immediately
- * discard a pending checkpoint as soon as it sees a barrier from a
- * later checkpoint from some channel.
- *
- * This behavior is crucial, otherwise topologies where different inputs
- * have different latency (and that latency is close to or higher than the
- * checkpoint interval) may skip many checkpoints, or fail to complete a
- * checkpoint all together.
- */
- @Test
- public void testCompleteCheckpointsOnLateBarriers() {
- try {
- BufferOrEvent[] sequence = {
- // checkpoint 2
- createBuffer(1), createBuffer(1), createBuffer(0), createBuffer(2),
- createBarrier(2, 1), createBarrier(2, 0), createBarrier(2, 2),
-
- // incomplete checkpoint 3
- createBuffer(1), createBuffer(0),
- createBarrier(3, 1), createBarrier(3, 2),
-
- // some barriers from checkpoint 4
- createBuffer(1), createBuffer(0),
- createBarrier(4, 2), createBarrier(4, 1),
- createBuffer(1), createBuffer(2),
-
- // last barrier from checkpoint 3
- createBarrier(3, 0),
-
- // complete checkpoint 4
- createBuffer(0), createBarrier(4, 0),
-
- // regular checkpoint 5
- createBuffer(1), createBuffer(2), createBarrier(5, 1),
- createBuffer(0), createBarrier(5, 0),
- createBuffer(1), createBarrier(5, 2),
-
- // checkpoint 6 (incomplete),
- createBuffer(1), createBarrier(6, 1),
- createBuffer(0), createBarrier(6, 0),
-
- // checkpoint 7, with early barriers for checkpoints 8 and 9
- createBuffer(1), createBarrier(7, 1),
- createBuffer(0), createBarrier(7, 2),
- createBuffer(2), createBarrier(8, 2),
- createBuffer(0), createBarrier(8, 1),
- createBuffer(1), createBarrier(9, 1),
-
- // complete checkpoint 7, first barriers from checkpoint 10
- createBarrier(7, 0),
- createBuffer(0), createBarrier(9, 2),
- createBuffer(2), createBarrier(10, 2),
-
- // complete checkpoint 8 and 9
- createBarrier(8, 0),
- createBuffer(1), createBuffer(2), createBarrier(9, 0),
-
- // trailing data
- createBuffer(1), createBuffer(0), createBuffer(2)
- };
-
- MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
- BarrierTracker tracker = new BarrierTracker(gate);
-
- CheckpointSequenceValidator validator =
- new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9);
- tracker.registerCheckpointEventHandler(validator);
-
- for (BufferOrEvent boe : sequence) {
- if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
- assertEquals(boe, tracker.getNextNonBlocked());
- }
- }
-
- assertNull(tracker.getNextNonBlocked());
- assertNull(tracker.getNextNonBlocked());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Utils
- // ------------------------------------------------------------------------
-
- private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
- }
-
- private static BufferOrEvent createBuffer(int channel) {
- return new BufferOrEvent(
- new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
- }
-
- // ------------------------------------------------------------------------
- // Testing Mocks
- // ------------------------------------------------------------------------
-
- private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
-
- private final long[] checkpointIDs;
-
- private int i = 0;
-
- private CheckpointSequenceValidator(long... checkpointIDs) {
- this.checkpointIDs = checkpointIDs;
- }
-
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- assertTrue("More checkpoints than expected", i < checkpointIDs.length);
- assertNotNull(barrier);
- assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
- assertTrue(barrier.getTimestamp() > 0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
deleted file mode 100644
index e85eddb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Random;
-
-
-import static org.junit.Assert.*;
-
-public class BufferSpillerTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class);
-
- private static final int PAGE_SIZE = 4096;
-
- private static IOManager IO_MANAGER;
-
- private BufferSpiller spiller;
-
-
- // ------------------------------------------------------------------------
- // Setup / Cleanup
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void setupIOManager() {
- IO_MANAGER = new IOManagerAsync();
- }
-
- @AfterClass
- public static void shutdownIOManager() {
- IO_MANAGER.shutdown();
- }
-
- @Before
- public void createSpiller() {
- try {
- spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Cannot create BufferSpiller: " + e.getMessage());
- }
- }
-
- @After
- public void cleanupSpiller() {
- if (spiller != null) {
- try {
- spiller.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Cannot properly close the BufferSpiller: " + e.getMessage());
- }
-
- assertFalse(spiller.getCurrentChannel().isOpen());
- assertFalse(spiller.getCurrentSpillFile().exists());
- }
-
- checkNoTempFilesRemain();
- }
-
- // ------------------------------------------------------------------------
- // Tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testRollOverEmptySequences() {
- try {
- assertNull(spiller.rollOver());
- assertNull(spiller.rollOver());
- assertNull(spiller.rollOver());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSpillAndRollOverSimple() {
- try {
- final Random rnd = new Random();
- final Random bufferRnd = new Random();
-
- final int maxNumEventsAndBuffers = 3000;
- final int maxNumChannels = 1656;
-
- // do multiple spilling / rolling over rounds
- for (int round = 0; round < 5; round++) {
-
- final long bufferSeed = rnd.nextLong();
- bufferRnd.setSeed(bufferSeed);
-
- final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
- final int numChannels = rnd.nextInt(maxNumChannels) + 1;
-
- final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
-
- // generate sequence
- for (int i = 0; i < numEventsAndBuffers; i++) {
- boolean isEvent = rnd.nextDouble() < 0.05d;
- if (isEvent) {
- BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
- events.add(evt);
- spiller.add(evt);
- }
- else {
- BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
- spiller.add(evt);
- }
- }
-
- // reset and create reader
- bufferRnd.setSeed(bufferSeed);
-
- BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
- seq.open();
-
- // read and validate the sequence
-
- int numEvent = 0;
- for (int i = 0; i < numEventsAndBuffers; i++) {
- BufferOrEvent next = seq.getNext();
- assertNotNull(next);
- if (next.isEvent()) {
- BufferOrEvent expected = events.get(numEvent++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
- }
- }
-
- // no further data
- assertNull(seq.getNext());
-
- // all events need to be consumed
- assertEquals(events.size(), numEvent);
-
- seq.cleanup();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSpillWhileReading() {
- LOG.info("Starting SpillWhileReading test");
-
- try {
- final int sequences = 10;
-
- final Random rnd = new Random();
-
- final int maxNumEventsAndBuffers = 30000;
- final int maxNumChannels = 1656;
-
- int sequencesConsumed = 0;
-
- ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
- SequenceToConsume currentSequence = null;
- int currentNumEvents = 0;
- int currentNumRecordAndEvents = 0;
-
- // do multiple spilling / rolling over rounds
- for (int round = 0; round < 2*sequences; round++) {
-
- if (round % 2 == 1) {
- // make this an empty sequence
- assertNull(spiller.rollOver());
- }
- else {
- // proper spilled sequence
- final long bufferSeed = rnd.nextLong();
- final Random bufferRnd = new Random(bufferSeed);
-
- final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
- final int numChannels = rnd.nextInt(maxNumChannels) + 1;
-
- final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
-
- int generated = 0;
- while (generated < numEventsAndBuffers) {
-
- if (currentSequence == null || rnd.nextDouble() < 0.5) {
- // add a new record
- boolean isEvent = rnd.nextDouble() < 0.05;
- if (isEvent) {
- BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
- events.add(evt);
- spiller.add(evt);
- }
- else {
- BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
- spiller.add(evt);
- }
- generated++;
- }
- else {
- // consume a record
- BufferOrEvent next = currentSequence.sequence.getNext();
- assertNotNull(next);
- if (next.isEvent()) {
- BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- Random validationRnd = currentSequence.bufferRnd;
- validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
- }
-
- currentNumRecordAndEvents++;
- if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
- // done with the sequence
- currentSequence.sequence.cleanup();
- sequencesConsumed++;
-
- // validate we had all events
- assertEquals(currentSequence.events.size(), currentNumEvents);
-
- // reset
- currentSequence = pendingSequences.pollFirst();
- if (currentSequence != null) {
- currentSequence.sequence.open();
- }
-
- currentNumRecordAndEvents = 0;
- currentNumEvents = 0;
- }
- }
- }
-
- // done generating a sequence. queue it for consumption
- bufferRnd.setSeed(bufferSeed);
- BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
-
- SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
-
- if (currentSequence == null) {
- currentSequence = stc;
- stc.sequence.open();
- }
- else {
- pendingSequences.addLast(stc);
- }
- }
- }
-
- // consume all the remainder
- while (currentSequence != null) {
- // consume a record
- BufferOrEvent next = currentSequence.sequence.getNext();
- assertNotNull(next);
- if (next.isEvent()) {
- BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- Random validationRnd = currentSequence.bufferRnd;
- validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
- }
-
- currentNumRecordAndEvents++;
- if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
- // done with the sequence
- currentSequence.sequence.cleanup();
- sequencesConsumed++;
-
- // validate we had all events
- assertEquals(currentSequence.events.size(), currentNumEvents);
-
- // reset
- currentSequence = pendingSequences.pollFirst();
- if (currentSequence != null) {
- currentSequence.sequence.open();
- }
-
- currentNumRecordAndEvents = 0;
- currentNumEvents = 0;
- }
- }
-
- assertEquals(sequences, sequencesConsumed);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Utils
- // ------------------------------------------------------------------------
-
- private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) {
- long magicNumber = rnd.nextLong();
- byte[] data = new byte[rnd.nextInt(1000)];
- rnd.nextBytes(data);
- TestEvent evt = new TestEvent(magicNumber, data);
-
- int channelIndex = rnd.nextInt(numChannels);
-
- return new BufferOrEvent(evt, channelIndex);
- }
-
- private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
- MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
- for (int i = 0; i < size; i++) {
- seg.put(i, (byte) i);
- }
-
- Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
- buf.setSize(size);
- return new BufferOrEvent(buf, channelIndex);
- }
-
- private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
- assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
- assertTrue("is not buffer", boe.isBuffer());
-
- Buffer buf = boe.getBuffer();
- assertEquals("wrong buffer size", expectedSize, buf.getSize());
-
- MemorySegment seg = buf.getMemorySegment();
- for (int i = 0; i < expectedSize; i++) {
- byte expected = (byte) i;
- if (expected != seg.get(i)) {
- fail(String.format(
- "wrong buffer contents at position %s : expected=%d , found=%d", i, expected, seg.get(i)));
- }
- }
- }
-
- private static void checkNoTempFilesRemain() {
- // validate that all temp files have been removed
- for (File dir : IO_MANAGER.getSpillingDirectories()) {
- for (String file : dir.list()) {
- if (file != null && !(file.equals(".") || file.equals(".."))) {
- fail("barrier buffer did not clean up temp files. remaining file: " + file);
- }
- }
- }
- }
-
- private static class SequenceToConsume {
-
- final BufferSpiller.SpilledBufferOrEventSequence sequence;
- final ArrayList<BufferOrEvent> events;
- final Random bufferRnd;
- final int numBuffersAndEvents;
- final int numChannels;
-
- private SequenceToConsume(Random bufferRnd, ArrayList<BufferOrEvent> events,
- BufferSpiller.SpilledBufferOrEventSequence sequence,
- int numBuffersAndEvents, int numChannels) {
- this.bufferRnd = bufferRnd;
- this.events = events;
- this.sequence = sequence;
- this.numBuffersAndEvents = numBuffersAndEvents;
- this.numChannels = numChannels;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
deleted file mode 100644
index cb8a058..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
-
-public class MockInputGate implements InputGate {
-
- private final int pageSize;
-
- private final int numChannels;
-
- private final Queue<BufferOrEvent> boes;
-
- private final boolean[] closed;
-
- private int closedChannels;
-
-
- public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) {
- this.pageSize = pageSize;
- this.numChannels = numChannels;
- this.boes = new ArrayDeque<BufferOrEvent>(boes);
- this.closed = new boolean[numChannels];
- }
-
- @Override
- public int getPageSize() {
- return pageSize;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return numChannels;
- }
-
- @Override
- public boolean isFinished() {
- return boes.isEmpty();
- }
-
- @Override
- public BufferOrEvent getNextBufferOrEvent() {
- BufferOrEvent next = boes.poll();
- if (next == null) {
- return null;
- }
-
- int channelIdx = next.getChannelIndex();
- if (closed[channelIdx]) {
- throw new RuntimeException("Inconsistent: Channel " + channelIdx
- + " has data even though it is already closed.");
- }
- if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) {
- closed[channelIdx] = true;
- closedChannels++;
- }
- return next;
- }
-
- @Override
- public void requestPartitions() {}
-
- @Override
- public void sendTaskEvent(TaskEvent event) {}
-
- @Override
- public void registerListener(EventListener<InputGate> listener) {}
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
deleted file mode 100644
index 991b033..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that validate the behavior of the {@link SpilledBufferOrEventSequence} in isolation,
- * with respect to detecting corrupt sequences, trailing data, and interleaved buffers and events.
- */
-public class SpilledBufferOrEventSequenceTest {
-
- private final ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024).order(ByteOrder.LITTLE_ENDIAN);
- private final int pageSize = 32*1024;
-
- private File tempFile;
- private FileChannel fileChannel;
-
-
- @Before
- public void initTempChannel() {
- try {
- tempFile = File.createTempFile("testdata", "tmp");
- fileChannel = new RandomAccessFile(tempFile, "rw").getChannel();
- }
- catch (Exception e) {
- cleanup();
- }
- }
-
- @After
- public void cleanup() {
- if (fileChannel != null) {
- try {
- fileChannel.close();
- }
- catch (IOException e) {
- // ignore
- }
- }
- if (tempFile != null) {
- //noinspection ResultOfMethodCallIgnored
- tempFile.delete();
- }
- }
-
-
- // ------------------------------------------------------------------------
- // Tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testEmptyChannel() {
- try {
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
-
- assertNull(seq.getNext());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testIncompleteHeaderOnFirstElement() {
- try {
- ByteBuffer buf = ByteBuffer.allocate(7);
- buf.order(ByteOrder.LITTLE_ENDIAN);
-
- fileChannel.write(buf);
- fileChannel.position(0);
-
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
-
- try {
- seq.getNext();
- fail("should fail with an exception");
- }
- catch (IOException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testBufferSequence() {
- try {
- final Random rnd = new Random();
- final long seed = rnd.nextLong();
-
- final int numBuffers = 325;
- final int numChannels = 671;
-
- rnd.setSeed(seed);
-
- for (int i = 0; i < numBuffers; i++) {
- writeBuffer(fileChannel, rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
- }
-
- fileChannel.position(0L);
- rnd.setSeed(seed);
-
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
-
- for (int i = 0; i < numBuffers; i++) {
- validateBuffer(seq.getNext(), rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
- }
-
- // should have no more data
- assertNull(seq.getNext());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testBufferSequenceWithIncompleteBuffer() {
- try {
- writeBuffer(fileChannel, 1672, 7);
-
- // write an incomplete buffer
- ByteBuffer data = ByteBuffer.allocate(615);
- data.order(ByteOrder.LITTLE_ENDIAN);
-
- data.putInt(2);
- data.putInt(999);
- data.put((byte) 0);
- data.position(0);
- data.limit(312);
- fileChannel.write(data);
- fileChannel.position(0L);
-
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
-
- // first one is valid
- validateBuffer(seq.getNext(), 1672, 7);
-
- // next one should fail
- try {
- seq.getNext();
- fail("should fail with an exception");
- }
- catch (IOException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testEventSequence() {
- try {
- final Random rnd = new Random();
- final int numEvents = 3000;
- final int numChannels = 1656;
-
- final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(numEvents);
-
- for (int i = 0; i < numEvents; i++) {
- events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
- }
-
- fileChannel.position(0L);
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
-
- int i = 0;
- BufferOrEvent boe;
- while ((boe = seq.getNext()) != null) {
- BufferOrEvent expected = events.get(i);
- assertTrue(boe.isEvent());
- assertEquals(expected.getEvent(), boe.getEvent());
- assertEquals(expected.getChannelIndex(), boe.getChannelIndex());
- i++;
- }
-
- assertEquals(numEvents, i);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMixedSequence() {
- try {
- final Random rnd = new Random();
- final Random bufferRnd = new Random();
-
- final long bufferSeed = rnd.nextLong();
- bufferRnd.setSeed(bufferSeed);
-
- final int numEventsAndBuffers = 3000;
- final int numChannels = 1656;
-
- final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
-
- // generate sequence
-
- for (int i = 0; i < numEventsAndBuffers; i++) {
- boolean isEvent = rnd.nextDouble() < 0.05d;
- if (isEvent) {
- events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
- }
- else {
- writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
- }
- }
-
- // reset and create reader
-
- fileChannel.position(0L);
- bufferRnd.setSeed(bufferSeed);
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
-
- // read and validate the sequence
-
- int numEvent = 0;
- for (int i = 0; i < numEventsAndBuffers; i++) {
- BufferOrEvent next = seq.getNext();
- if (next.isEvent()) {
- BufferOrEvent expected = events.get(numEvent++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
- }
- }
-
- // no further data
- assertNull(seq.getNext());
-
- // all events need to be consumed
- assertEquals(events.size(), numEvent);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultipleSequences() {
- File secondFile = null;
- FileChannel secondChannel = null;
-
- try {
- // create the second file channel
- secondFile = File.createTempFile("testdata", "tmp");
- secondChannel = new RandomAccessFile(secondFile, "rw").getChannel();
-
- final Random rnd = new Random();
- final Random bufferRnd = new Random();
-
- final long bufferSeed = rnd.nextLong();
- bufferRnd.setSeed(bufferSeed);
-
- final int numEventsAndBuffers1 = 272;
- final int numEventsAndBuffers2 = 151;
-
- final int numChannels = 1656;
-
- final ArrayList<BufferOrEvent> events1 = new ArrayList<BufferOrEvent>(128);
- final ArrayList<BufferOrEvent> events2 = new ArrayList<BufferOrEvent>(128);
-
- // generate sequence 1
-
- for (int i = 0; i < numEventsAndBuffers1; i++) {
- boolean isEvent = rnd.nextDouble() < 0.05d;
- if (isEvent) {
- events1.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
- }
- else {
- writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
- }
- }
-
- // generate sequence 2
-
- for (int i = 0; i < numEventsAndBuffers2; i++) {
- boolean isEvent = rnd.nextDouble() < 0.05d;
- if (isEvent) {
- events2.add(generateAndWriteEvent(secondChannel, rnd, numChannels));
- }
- else {
- writeBuffer(secondChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
- }
- }
-
- // reset and create reader
-
- fileChannel.position(0L);
- secondChannel.position(0L);
-
- bufferRnd.setSeed(bufferSeed);
-
- SpilledBufferOrEventSequence seq1 = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- SpilledBufferOrEventSequence seq2 = new SpilledBufferOrEventSequence(secondFile, secondChannel, buffer, pageSize);
-
- // read and validate the sequence 1
- seq1.open();
-
- int numEvent = 0;
- for (int i = 0; i < numEventsAndBuffers1; i++) {
- BufferOrEvent next = seq1.getNext();
- if (next.isEvent()) {
- BufferOrEvent expected = events1.get(numEvent++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
- }
- }
- assertNull(seq1.getNext());
- assertEquals(events1.size(), numEvent);
-
- // read and validate the sequence 2
- seq2.open();
-
- numEvent = 0;
- for (int i = 0; i < numEventsAndBuffers2; i++) {
- BufferOrEvent next = seq2.getNext();
- if (next.isEvent()) {
- BufferOrEvent expected = events2.get(numEvent++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
- }
- }
- assertNull(seq2.getNext());
- assertEquals(events2.size(), numEvent);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- if (secondChannel != null) {
- try {
- secondChannel.close();
- }
- catch (IOException e) {
- // ignore here
- }
- }
- if (secondFile != null) {
- //noinspection ResultOfMethodCallIgnored
- secondFile.delete();
- }
- }
- }
-
- @Test
- public void testCleanup() {
- try {
- ByteBuffer data = ByteBuffer.allocate(157);
- data.order(ByteOrder.LITTLE_ENDIAN);
-
- fileChannel.write(data);
- fileChannel.position(54);
-
- SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
- seq.open();
- seq.cleanup();
-
- assertFalse(fileChannel.isOpen());
- assertFalse(tempFile.exists());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Utils
- // ------------------------------------------------------------------------
-
- private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Random rnd, int numChannels) throws IOException {
- long magicNumber = rnd.nextLong();
- byte[] data = new byte[rnd.nextInt(1000)];
- rnd.nextBytes(data);
- TestEvent evt = new TestEvent(magicNumber, data);
-
- int channelIndex = rnd.nextInt(numChannels);
-
- ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
- ByteBuffer header = ByteBuffer.allocate(9);
- header.order(ByteOrder.LITTLE_ENDIAN);
-
- header.putInt(channelIndex);
- header.putInt(serializedEvent.remaining());
- header.put((byte) 1);
- header.flip();
-
- fileChannel.write(header);
- fileChannel.write(serializedEvent);
- return new BufferOrEvent(evt, channelIndex);
- }
-
- private static void writeBuffer(FileChannel fileChannel, int size, int channelIndex) throws IOException {
- ByteBuffer data = ByteBuffer.allocate(size + 9);
- data.order(ByteOrder.LITTLE_ENDIAN);
-
- data.putInt(channelIndex);
- data.putInt(size);
- data.put((byte) 0);
- for (int i = 0; i < size; i++) {
- data.put((byte) i);
- }
- data.flip();
- fileChannel.write(data);
- }
-
- private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
- assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
- assertTrue("is not buffer", boe.isBuffer());
-
- Buffer buf = boe.getBuffer();
- assertEquals("wrong buffer size", expectedSize, buf.getSize());
-
- MemorySegment seg = buf.getMemorySegment();
- for (int i = 0; i < expectedSize; i++) {
- assertEquals("wrong buffer contents", (byte) i, seg.get(i));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
deleted file mode 100644
index 45bbbda..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.types.LongValue;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * This test uses the PowerMockRunner runner to work around the fact that the
- * {@link ResultPartitionWriter} class is final.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-public class StreamRecordWriterTest {
-
- /**
- * Verifies that exceptions during flush from the output flush thread are
- * recognized in the writer.
- */
- @Test
- public void testPropagateAsyncFlushError() {
- FailingWriter<LongValue> testWriter = null;
- try {
- ResultPartitionWriter mockResultPartitionWriter = getMockWriter(5);
-
- // test writer that flushes every 5ms and fails after 3 flushes
- testWriter = new FailingWriter<LongValue>(mockResultPartitionWriter,
- new RoundRobinChannelSelector<LongValue>(), 5, 3);
-
- try {
- long deadline = System.currentTimeMillis() + 20000; // in max 20 seconds (conservative)
- long l = 0L;
-
- while (System.currentTimeMillis() < deadline) {
- testWriter.emit(new LongValue(l++));
- }
-
- fail("This should have failed with an exception");
- }
- catch (IOException e) {
- assertNotNull(e.getCause());
- assertTrue(e.getCause().getMessage().contains("Test Exception"));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- if (testWriter != null) {
- testWriter.close();
- }
- }
- }
-
- private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
- BufferProvider mockProvider = mock(BufferProvider.class);
- when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
- @Override
- public Buffer answer(InvocationOnMock invocation) {
- return new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(4096),
- FreeingBufferRecycler.INSTANCE);
- }
- });
-
- ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
- when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
- when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
-
-
- return mockWriter;
- }
-
- // ------------------------------------------------------------------------
-
- private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T> {
-
- private int flushesBeforeException;
-
- private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
- long timeout, int flushesBeforeException) {
- super(writer, channelSelector, timeout);
- this.flushesBeforeException = flushesBeforeException;
- }
-
- @Override
- public void flush() throws IOException {
- if (flushesBeforeException-- <= 0) {
- throw new IOException("Test Exception");
- }
- super.flush();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
deleted file mode 100644
index 286477a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.util.StringUtils;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * A simple task event, used for validation of buffer or event blocking/buffering.
- */
-public class TestEvent extends AbstractEvent {
-
- private long magicNumber;
-
- private byte[] payload;
-
- public TestEvent() {}
-
- public TestEvent(long magicNumber, byte[] payload) {
- this.magicNumber = magicNumber;
- this.payload = payload;
- }
-
-
- // ------------------------------------------------------------------------
- // Serialization
- // ------------------------------------------------------------------------
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeLong(magicNumber);
- out.writeInt(payload.length);
- out.write(payload);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.magicNumber = in.readLong();
- this.payload = new byte[in.readInt()];
- in.read(this.payload);
- }
-
- // ------------------------------------------------------------------------
- // Standard utilities
- // ------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return Long.valueOf(magicNumber).hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj.getClass() == TestEvent.class) {
- TestEvent that = (TestEvent) obj;
- return this.magicNumber == that.magicNumber && Arrays.equals(this.payload, that.payload);
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return String.format("TestEvent %d (%s)", magicNumber, StringUtils.byteToHexString(payload));
- }
-}
\ No newline at end of file