You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/05 20:20:15 UTC
[1/2] flink git commit: [FLINK-2484] [streaming] BarrierBuffer
releases temp files properly.
Repository: flink
Updated Branches:
refs/heads/master fb7e63422 -> 3e73496c5
[FLINK-2484] [streaming] BarrierBuffer releases temp files properly.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d738430c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d738430c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d738430c
Branch: refs/heads/master
Commit: d738430cb7e26b5e31d953efa5e0036082d8de6e
Parents: fb7e634
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 4 15:18:32 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 17:30:01 2015 +0200
----------------------------------------------------------------------
.../streaming/runtime/io/BarrierBuffer.java | 3 +-
.../streaming/runtime/io/BarrierBufferTest.java | 33 +++++++++++++++++++-
2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d738430c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index b7766ee..fd896c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -76,7 +76,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
/**
*
- * @param inputGate Teh input gate to draw the buffers and events from.
+ * @param inputGate The input gate to draw the buffers and events from.
* @param ioManager The I/O manager that gives access to the temp directories.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
@@ -102,6 +102,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (currentBuffered != null) {
next = currentBuffered.getNext();
if (next == null) {
+ currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
if (currentBuffered != null) {
currentBuffered.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/d738430c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 872e226..dd4d395 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -34,6 +34,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.File;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
@@ -94,6 +95,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -125,6 +128,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -166,6 +171,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -283,6 +290,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -338,8 +347,10 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
- buffer.cleanup();
+ buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -456,6 +467,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -533,6 +546,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -620,6 +635,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -719,6 +736,8 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
buffer.cleanup();
+
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -769,6 +788,7 @@ public class BarrierBufferTest {
buffer.getNextNonBlocked();
buffer.cleanup();
+ checkNoTempFilesRemain();
}
catch (Exception e) {
e.printStackTrace();
@@ -811,6 +831,17 @@ public class BarrierBufferTest {
}
}
+ private static void checkNoTempFilesRemain() {
+ // validate that all temp files have been removed
+ for (File dir : IO_MANAGER.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("barrier buffer did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Testing Mocks
// ------------------------------------------------------------------------
[2/2] flink git commit: [FLINK-2464] [tests] Make buffer spilling
test robust to Java 6.
Posted by se...@apache.org.
[FLINK-2464] [tests] Make buffer spilling test robust to Java 6.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e73496c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e73496c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e73496c
Branch: refs/heads/master
Commit: 3e73496c59c6e468be265afd931370efa6ba2d8a
Parents: d738430
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 5 17:16:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 17:31:55 2015 +0200
----------------------------------------------------------------------
.../streaming/runtime/io/BufferSpillerTest.java | 221 ++++++++++---------
1 file changed, 112 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3e73496c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index fbc19ec..355b7c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -30,14 +30,15 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
+
import static org.junit.Assert.*;
@@ -91,6 +92,8 @@ public class BufferSpillerTest {
assertFalse(spiller.getCurrentChannel().isOpen());
assertFalse(spiller.getCurrentSpillFile().exists());
}
+
+ checkNoTempFilesRemain();
}
// ------------------------------------------------------------------------
@@ -189,14 +192,17 @@ public class BufferSpillerTest {
final int sequences = 10;
final Random rnd = new Random();
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- final SequenceConsumer consumer = new SequenceConsumer(error, sequences);
- consumer.start();
final int maxNumEventsAndBuffers = 30000;
final int maxNumChannels = 1656;
+ int sequencesConsumed = 0;
+
+ ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
+ SequenceToConsume currentSequence = null;
+ int currentNumEvents = 0;
+ int currentNumRecordAndEvents = 0;
+
// do multiple spilling / rolling over rounds
for (int round = 0; round < 2*sequences; round++) {
@@ -214,43 +220,110 @@ public class BufferSpillerTest {
final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
- // generate sequence
- for (int i = 0; i < numEventsAndBuffers; i++) {
- boolean isEvent = rnd.nextDouble() < 0.05d;
- if (isEvent) {
- BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
- events.add(evt);
- spiller.add(evt);
+ int generated = 0;
+ while (generated < numEventsAndBuffers) {
+
+ if (currentSequence == null || rnd.nextDouble() < 0.5) {
+ // add a new record
+ boolean isEvent = rnd.nextDouble() < 0.05;
+ if (isEvent) {
+ BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
+ events.add(evt);
+ spiller.add(evt);
+ }
+ else {
+ BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+ spiller.add(evt);
+ }
+ generated++;
}
else {
- BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
- spiller.add(evt);
+ // consume a record
+ BufferOrEvent next = currentSequence.sequence.getNext();
+ assertNotNull(next);
+ if (next.isEvent()) {
+ BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
+ assertEquals(expected.getEvent(), next.getEvent());
+ assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+ }
+ else {
+ Random validationRnd = currentSequence.bufferRnd;
+ validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
+ }
+
+ currentNumRecordAndEvents++;
+ if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
+ // done with the sequence
+ currentSequence.sequence.cleanup();
+ sequencesConsumed++;
+
+ // validate we had all events
+ assertEquals(currentSequence.events.size(), currentNumEvents);
+
+ // reset
+ currentSequence = pendingSequences.pollFirst();
+ if (currentSequence != null) {
+ currentSequence.sequence.open();
+ }
+
+ currentNumRecordAndEvents = 0;
+ currentNumEvents = 0;
+ }
}
}
- // reset and create reader
+ // done generating a sequence. queue it for consumption
bufferRnd.setSeed(bufferSeed);
BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
- consumer.queue(stc);
+
+ if (currentSequence == null) {
+ currentSequence = stc;
+ stc.sequence.open();
+ }
+ else {
+ pendingSequences.addLast(stc);
+ }
}
}
- // wait for the consumer
- consumer.join(180000);
- assertFalse("sequence consumer did not finish its work in time", consumer.isAlive());
-
- // validate there was no error in the consumer
- if (error.get() != null) {
- Throwable t = error.get();
- if (t instanceof Error) {
- throw (Error) t;
+ // consume all the remainder
+ while (currentSequence != null) {
+ // consume a record
+ BufferOrEvent next = currentSequence.sequence.getNext();
+ assertNotNull(next);
+ if (next.isEvent()) {
+ BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
+ assertEquals(expected.getEvent(), next.getEvent());
+ assertEquals(expected.getChannelIndex(), next.getChannelIndex());
}
else {
- throw new Exception("Error while consuming the spilled records", t);
+ Random validationRnd = currentSequence.bufferRnd;
+ validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
+ }
+
+ currentNumRecordAndEvents++;
+ if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
+ // done with the sequence
+ currentSequence.sequence.cleanup();
+ sequencesConsumed++;
+
+ // validate we had all events
+ assertEquals(currentSequence.events.size(), currentNumEvents);
+
+ // reset
+ currentSequence = pendingSequences.pollFirst();
+ if (currentSequence != null) {
+ currentSequence.sequence.open();
+ }
+
+ currentNumRecordAndEvents = 0;
+ currentNumEvents = 0;
}
}
+
+ assertEquals(sequences, sequencesConsumed);
}
catch (Exception e) {
e.printStackTrace();
@@ -300,10 +373,17 @@ public class BufferSpillerTest {
}
}
}
-
- // ------------------------------------------------------------------------
- // Async Consumer
- // ------------------------------------------------------------------------
+
+ private static void checkNoTempFilesRemain() {
+ // validate that all temp files have been removed
+ for (File dir : IO_MANAGER.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("barrier buffer did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
private static class SequenceToConsume {
@@ -323,81 +403,4 @@ public class BufferSpillerTest {
this.numChannels = numChannels;
}
}
-
- private static class SequenceConsumer extends Thread {
-
- private final AtomicReference<Throwable> error;
- private final BlockingQueue<SequenceToConsume> sequences;
-
- private final int numSequencesToConsume;
-
- private int consumedSequences;
-
- private SequenceConsumer(AtomicReference<Throwable> error, int numSequencesToConsume) {
- super("Sequence Consumer");
- setDaemon(true);
-
- this.error = error;
- this.numSequencesToConsume = numSequencesToConsume;
- this.sequences = new LinkedBlockingQueue<SequenceToConsume>();
- }
-
-
- @Override
- public void run() {
- try {
- while (consumedSequences < numSequencesToConsume) {
- // get next sequence
- SequenceToConsume nextSequence = sequences.take();
-
- // wait a bit, allow some stuff to queue up
- Thread.sleep(50);
-
- BufferSpiller.SpilledBufferOrEventSequence seq = nextSequence.sequence;
- ArrayList<BufferOrEvent> events = nextSequence.events;
- Random bufferRnd = nextSequence.bufferRnd;
- int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
- int numChannels = nextSequence.numChannels;
-
- LOG.info("Reading sequence {}", consumedSequences);
-
- // consume sequence
- seq.open();
-
- int numEvent = 0;
- for (int i = 0; i < numBuffersAndEvents; i++) {
- BufferOrEvent next = seq.getNext();
- assertNotNull(next);
- if (next.isEvent()) {
- BufferOrEvent expected = events.get(numEvent++);
- assertEquals(expected.getEvent(), next.getEvent());
- assertEquals(expected.getChannelIndex(), next.getChannelIndex());
- }
- else {
- validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
- }
- }
-
- // no further data
- assertNull(seq.getNext());
-
- // all events need to be consumed
- assertEquals(events.size(), numEvent);
-
- // remove all temp files
- seq.cleanup();
-
- consumedSequences++;
- }
-
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
-
- public void queue(SequenceToConsume next) {
- sequences.add(next);
- }
- }
}