You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 10:03:48 UTC
[2/3] flink git commit: [FLINK-1706] Spilling BarrierBuffer added +
basic tests
[FLINK-1706] Spilling BarrierBuffer added + basic tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/650c528f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/650c528f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/650c528f
Branch: refs/heads/master
Commit: 650c528fc4452e6b11294062699001a5b4a21ac9
Parents: c0917f2
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Mar 17 17:39:27 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Mar 19 21:41:38 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/io/network/buffer/Buffer.java | 4 +
.../runtime/io/network/buffer/BufferTest.java | 1 +
.../api/streamvertex/StreamingSuperstep.java | 8 ++
.../flink/streaming/io/BarrierBuffer.java | 54 +++++++----
.../flink/streaming/io/BufferSpiller.java | 85 ++++++++++++++++++
.../apache/flink/streaming/io/SpillReader.java | 82 +++++++++++++++++
.../streaming/io/SpillingBufferOrEvent.java | 94 ++++++++++++++++++++
.../flink/streaming/io/BarrierBufferTest.java | 34 ++++---
.../streaming/io/SpillingBufferOrEventTest.java | 94 ++++++++++++++++++++
9 files changed, 424 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 2ed82fa..2642521 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -89,6 +89,10 @@ public class Buffer {
return memorySegment.wrap(0, currentSize).duplicate();
}
}
+
+ public BufferRecycler getRecycler(){
+ return recycler;
+ }
public int getSize() {
synchronized (recycleLock) {
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index 2630608..f2f9c09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -50,4 +50,5 @@ public class BufferTest {
// OK => expected exception
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
index 557c636..d46ca79 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
@@ -49,4 +49,12 @@ public class StreamingSuperstep extends TaskEvent {
public long getId() {
return id;
}
+
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof StreamingSuperstep)) {
+ return false;
+ } else {
+ return ((StreamingSuperstep) other).id == this.id;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 42d4919..ea422e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -43,8 +43,8 @@ public class BarrierBuffer {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
- private Queue<BufferOrEvent> nonprocessed = new LinkedList<BufferOrEvent>();
- private Queue<BufferOrEvent> blockedNonprocessed = new LinkedList<BufferOrEvent>();
+ private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
+ private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
private Set<Integer> blockedChannels = new HashSet<Integer>();
private int totalNumberOfInputChannels;
@@ -56,10 +56,20 @@ public class BarrierBuffer {
private InputGate inputGate;
+ private SpillReader spillReader;
+ private BufferSpiller bufferSpiller;
+
public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
this.inputGate = inputGate;
totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.reader = reader;
+ try {
+ this.bufferSpiller = new BufferSpiller();
+ this.spillReader = new SpillReader();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
}
/**
@@ -77,28 +87,23 @@ public class BarrierBuffer {
}
/**
- * Buffers a bufferOrEvent received from a blocked channel
- *
- * @param bufferOrEvent
- * bufferOrEvent to buffer
- */
- protected void store(BufferOrEvent bufferOrEvent) {
- nonprocessed.add(bufferOrEvent);
- }
-
- /**
* Get then next non-blocked non-processed BufferOrEvent. Returns null if
* not available.
+ *
+ * @throws IOException
*/
- protected BufferOrEvent getNonProcessed() {
- BufferOrEvent nextNonprocessed;
+ protected BufferOrEvent getNonProcessed() throws IOException {
+ SpillingBufferOrEvent nextNonprocessed;
+
while ((nextNonprocessed = nonprocessed.poll()) != null) {
- if (isBlocked(nextNonprocessed.getChannelIndex())) {
- blockedNonprocessed.add(nextNonprocessed);
+ BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
+ if (isBlocked(boe.getChannelIndex())) {
+ blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
} else {
- return nextNonprocessed;
+ return boe;
}
}
+
return null;
}
@@ -137,7 +142,8 @@ public class BarrierBuffer {
bufferOrEvent = inputGate.getNextBufferOrEvent();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
// If channel blocked we just store it
- blockedNonprocessed.add(bufferOrEvent);
+ blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent, bufferSpiller,
+ spillReader));
} else {
return bufferOrEvent;
}
@@ -168,6 +174,8 @@ public class BarrierBuffer {
/**
* Releases the blocks on all channels.
+ *
+ * @throws IOException
*/
protected void releaseBlocks() {
if (!nonprocessed.isEmpty()) {
@@ -175,7 +183,15 @@ public class BarrierBuffer {
throw new RuntimeException("Error in barrier buffer logic");
}
nonprocessed = blockedNonprocessed;
- blockedNonprocessed = new LinkedList<BufferOrEvent>();
+ blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+
+ try {
+ spillReader.setSpillFile(bufferSpiller.getSpillFile());
+ bufferSpiller.resetSpillFile();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
blockedChannels.clear();
superstepStarted = false;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
new file mode 100644
index 0000000..e824430
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.StringUtils;
+
+public class BufferSpiller {
+
+ protected static Random rnd = new Random();
+
+ private File spillFile;
+ protected FileChannel spillingChannel;
+ private String tempDir;
+
+ public BufferSpiller() throws IOException {
+ String tempDirString = GlobalConfiguration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+ String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
+
+ tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
+
+ createSpillingChannel();
+ }
+
+ /**
+ * Dumps the contents of the buffer to disk and recycles the buffer.
+ */
+ public void spill(Buffer buffer) throws IOException {
+ spillingChannel.write(buffer.getNioBuffer());
+ buffer.recycle();
+ }
+
+ @SuppressWarnings("resource")
+ private void createSpillingChannel() throws IOException {
+ this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+ this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+ }
+
+ private static String randomString(Random random) {
+ final byte[] bytes = new byte[20];
+ random.nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
+
+ public void close() throws IOException {
+ if (spillingChannel != null && spillingChannel.isOpen()) {
+ spillingChannel.close();
+ }
+ }
+
+ public void resetSpillFile() throws IOException {
+ close();
+ createSpillingChannel();
+ }
+
+ public File getSpillFile() {
+ return spillFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
new file mode 100644
index 0000000..3526bea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+public class SpillReader {
+
+ private FileChannel spillingChannel;
+ private File spillFile;
+
+ /**
+ * Reads the next buffer from the spilled file. If a buffer pool was given,
+ * uses the buffer pool to request a new buffer to read into.
+ *
+ */
+ public Buffer readNextBuffer(int bufferSize, BufferPool bufferPool) throws IOException {
+
+ Buffer buffer = null;
+
+ // If available tries to request a new buffer from the pool
+ if (bufferPool != null) {
+ buffer = bufferPool.requestBuffer();
+ }
+
+ // If no bufferpool provided or the pool was empty create a new buffer
+ if (buffer == null) {
+ buffer = new Buffer(new MemorySegment(new byte[bufferSize]), new BufferRecycler() {
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ memorySegment.free();
+ }
+ });
+ }
+
+ spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
+
+ return buffer;
+ }
+
+ @SuppressWarnings("resource")
+ public void setSpillFile(File nextSpillFile) throws IOException {
+ // We can close and delete the file now
+ close();
+ if (spillFile != null) {
+ spillFile.delete();
+ }
+ this.spillFile = nextSpillFile;
+ this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+ }
+
+ public void close() throws IOException {
+ if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
+ this.spillingChannel.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
new file mode 100644
index 0000000..40713e2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+public class SpillingBufferOrEvent {
+
+ private BufferOrEvent boe;
+ private boolean isSpilled = false;
+
+ private SpillReader spillReader;
+
+ private BufferPool bufferPool;
+
+ private int channelIndex;
+ private int bufferSize;
+
+ public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
+ throws IOException {
+
+ this.boe = boe;
+ this.spillReader = reader;
+
+ if (shouldSpill()) {
+ spiller.spill(boe.getBuffer());
+ isSpilled = true;
+ boe = null;
+ }
+ }
+
+ /**
+ * If the buffer wasn't spilled simply returns the instance from the field,
+ * otherwise reads it from the spill reader
+ */
+ public BufferOrEvent getBufferOrEvent() throws IOException {
+ if (isSpilled) {
+ return new BufferOrEvent(spillReader.readNextBuffer(bufferSize, bufferPool),
+ channelIndex);
+ } else {
+ return boe;
+ }
+ }
+
+ /**
+ * Checks whether a given buffer should be spilled to disk. Currently it
+ * checks whether the buffer pool from which the buffer was supplied is
+ * empty and only spills if it is. This avoids out of memory exceptions and
+ * also blocks at the input gate.
+ */
+ private boolean shouldSpill() throws IOException {
+ if (boe.isBuffer()) {
+ Buffer buffer = boe.getBuffer();
+ this.bufferSize = buffer.getSize();
+ BufferRecycler recycler = buffer.getRecycler();
+
+ if (recycler instanceof BufferPool) {
+ bufferPool = (BufferPool) recycler;
+ Buffer nextBuffer = bufferPool.requestBuffer();
+ if (nextBuffer == null) {
+ return true;
+ } else {
+ nextBuffer.recycle();
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public boolean isSpilled() {
+ return isSpilled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index 1b4cc36..2b8a218 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -110,28 +110,36 @@ public class BarrierBufferTest {
BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
BufferOrEvent nextBoe;
- assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+ check(input.get(0), nextBoe = bb.getNextNonBlocked());
+ check(input.get(1), nextBoe = bb.getNextNonBlocked());
+ check(input.get(2), nextBoe = bb.getNextNonBlocked());
bb.processSuperstep(nextBoe);
- assertEquals(input.get(7), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(8), nextBoe = bb.getNextNonBlocked());
+ check(input.get(7), nextBoe = bb.getNextNonBlocked());
+ check(input.get(8), nextBoe = bb.getNextNonBlocked());
bb.processSuperstep(nextBoe);
- assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+ check(input.get(3), nextBoe = bb.getNextNonBlocked());
bb.processSuperstep(nextBoe);
- assertEquals(input.get(10), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(11), nextBoe = bb.getNextNonBlocked());
+ check(input.get(10), nextBoe = bb.getNextNonBlocked());
+ check(input.get(11), nextBoe = bb.getNextNonBlocked());
bb.processSuperstep(nextBoe);
- assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+ check(input.get(4), nextBoe = bb.getNextNonBlocked());
+ check(input.get(5), nextBoe = bb.getNextNonBlocked());
bb.processSuperstep(nextBoe);
- assertEquals(input.get(12), nextBoe = bb.getNextNonBlocked());
+ check(input.get(12), nextBoe = bb.getNextNonBlocked());
bb.processSuperstep(nextBoe);
- assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(9), nextBoe = bb.getNextNonBlocked());
+ check(input.get(6), nextBoe = bb.getNextNonBlocked());
+ check(input.get(9), nextBoe = bb.getNextNonBlocked());
}
+ private static void check(BufferOrEvent expected, BufferOrEvent actual) {
+ assertEquals(expected.isBuffer(), actual.isBuffer());
+ assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
+ if (expected.isEvent()) {
+ assertEquals(expected.getEvent(), actual.getEvent());
+ }
+ }
+
protected static class MockInputGate implements InputGate {
private int numChannels;
http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
new file mode 100644
index 0000000..2f28f90
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.junit.Test;
+
+public class SpillingBufferOrEventTest {
+
+ @Test
+ public void testSpilling() throws IOException, InterruptedException {
+ BufferSpiller bsp = new BufferSpiller();
+ SpillReader spr = new SpillReader();
+
+ BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+ BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+
+ Buffer b1 = pool1.requestBuffer();
+ b1.getMemorySegment().putInt(0, 10000);
+ BufferOrEvent boe1 = new BufferOrEvent(b1, 0);
+ SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
+
+ assertFalse(sboe1.isSpilled());
+
+ Buffer b2 = pool2.requestBuffer();
+ b2.getMemorySegment().putInt(0, 10000);
+ BufferOrEvent boe2 = new BufferOrEvent(b2, 0);
+ SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
+
+ assertFalse(sboe2.isSpilled());
+
+ Buffer b3 = pool1.requestBuffer();
+ b3.getMemorySegment().putInt(0, 50000);
+ BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
+ SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
+
+ assertTrue(sboe3.isSpilled());
+
+ Buffer b4 = pool2.requestBuffer();
+ b4.getMemorySegment().putInt(0, 60000);
+ BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
+ SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
+
+ assertTrue(sboe4.isSpilled());
+
+ bsp.close();
+
+ spr.setSpillFile(bsp.getSpillFile());
+
+ Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
+ assertEquals(10000, b1ret.getMemorySegment().getInt(0));
+ b1ret.recycle();
+
+ Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
+ assertEquals(10000, b2ret.getMemorySegment().getInt(0));
+ b2ret.recycle();
+
+ Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
+ assertEquals(50000, b3ret.getMemorySegment().getInt(0));
+ b3ret.recycle();
+
+ Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
+ assertEquals(60000, b4ret.getMemorySegment().getInt(0));
+ b4ret.recycle();
+
+ spr.close();
+ bsp.getSpillFile().delete();
+
+ }
+}