You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/05 20:20:15 UTC

[1/2] flink git commit: [FLINK-2484] [streaming] BarrierBuffer releases temp files properly.

Repository: flink
Updated Branches:
  refs/heads/master fb7e63422 -> 3e73496c5


[FLINK-2484] [streaming] BarrierBuffer releases temp files properly.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d738430c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d738430c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d738430c

Branch: refs/heads/master
Commit: d738430cb7e26b5e31d953efa5e0036082d8de6e
Parents: fb7e634
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 4 15:18:32 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 17:30:01 2015 +0200

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     |  3 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 33 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d738430c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index b7766ee..fd896c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -76,7 +76,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	/**
 	 * 
-	 * @param inputGate Teh input gate to draw the buffers and events from.
+	 * @param inputGate The input gate to draw the buffers and events from.
 	 * @param ioManager The I/O manager that gives access to the temp directories.
 	 * 
 	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
@@ -102,6 +102,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (currentBuffered != null) {
 				next = currentBuffered.getNext();
 				if (next == null) {
+					currentBuffered.cleanup();
 					currentBuffered = queuedBuffered.pollFirst();
 					if (currentBuffered != null) {
 						currentBuffered.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/d738430c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 872e226..dd4d395 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -34,6 +34,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.List;
@@ -94,6 +95,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 			
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -125,6 +128,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -166,6 +171,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -283,6 +290,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -338,8 +347,10 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			buffer.cleanup();
 			
+			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -456,6 +467,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -533,6 +546,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 			
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -620,6 +635,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -719,6 +736,8 @@ public class BarrierBufferTest {
 			assertNull(buffer.getNextNonBlocked());
 
 			buffer.cleanup();
+
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -769,6 +788,7 @@ public class BarrierBufferTest {
 			buffer.getNextNonBlocked();
 			buffer.cleanup();
 
+			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -811,6 +831,17 @@ public class BarrierBufferTest {
 		}
 	}
 	
+	private static void checkNoTempFilesRemain() {
+		// validate that all temp files have been removed
+		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+			for (String file : dir.list()) {
+				if (file != null && !(file.equals(".") || file.equals(".."))) {
+					fail("barrier buffer did not clean up temp files. remaining file: " + file);
+				}
+			}
+		}
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------


[2/2] flink git commit: [FLINK-2464] [tests] Make buffer spilling test robust to Java 6.

Posted by se...@apache.org.
[FLINK-2464] [tests] Make buffer spilling test robust to Java 6.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e73496c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e73496c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e73496c

Branch: refs/heads/master
Commit: 3e73496c59c6e468be265afd931370efa6ba2d8a
Parents: d738430
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 5 17:16:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 17:31:55 2015 +0200

----------------------------------------------------------------------
 .../streaming/runtime/io/BufferSpillerTest.java | 221 ++++++++++---------
 1 file changed, 112 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e73496c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index fbc19ec..355b7c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -30,14 +30,15 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
+
 
 import static org.junit.Assert.*;
 
@@ -91,6 +92,8 @@ public class BufferSpillerTest {
 			assertFalse(spiller.getCurrentChannel().isOpen());
 			assertFalse(spiller.getCurrentSpillFile().exists());
 		}
+		
+		checkNoTempFilesRemain();
 	}
 
 	// ------------------------------------------------------------------------
@@ -189,14 +192,17 @@ public class BufferSpillerTest {
 			final int sequences = 10;
 			
 			final Random rnd = new Random();
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-			
-			final SequenceConsumer consumer = new SequenceConsumer(error, sequences);
-			consumer.start();
 			
 			final int maxNumEventsAndBuffers = 30000;
 			final int maxNumChannels = 1656;
 			
+			int sequencesConsumed = 0;
+			
+			ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
+			SequenceToConsume currentSequence = null;
+			int currentNumEvents = 0;
+			int currentNumRecordAndEvents = 0;
+			
 			// do multiple spilling / rolling over rounds
 			for (int round = 0; round < 2*sequences; round++) {
 
@@ -214,43 +220,110 @@ public class BufferSpillerTest {
 	
 					final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
 	
-					// generate sequence
-					for (int i = 0; i < numEventsAndBuffers; i++) {
-						boolean isEvent = rnd.nextDouble() < 0.05d;
-						if (isEvent) {
-							BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
-							events.add(evt);
-							spiller.add(evt);
+					int generated = 0;
+					while (generated < numEventsAndBuffers) {
+						
+						if (currentSequence == null || rnd.nextDouble() < 0.5) {
+							// add a new record
+							boolean isEvent = rnd.nextDouble() < 0.05;
+							if (isEvent) {
+								BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
+								events.add(evt);
+								spiller.add(evt);
+							}
+							else {
+								BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
+								spiller.add(evt);
+							}
+							generated++;
 						}
 						else {
-							BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
-							spiller.add(evt);
+							// consume a record
+							BufferOrEvent next = currentSequence.sequence.getNext();
+							assertNotNull(next);
+							if (next.isEvent()) {
+								BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
+								assertEquals(expected.getEvent(), next.getEvent());
+								assertEquals(expected.getChannelIndex(), next.getChannelIndex());
+							}
+							else {
+								Random validationRnd = currentSequence.bufferRnd;
+								validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
+							}
+							
+							currentNumRecordAndEvents++;
+							if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
+								// done with the sequence
+								currentSequence.sequence.cleanup();
+								sequencesConsumed++;
+								
+								// validate we had all events
+								assertEquals(currentSequence.events.size(), currentNumEvents);
+								
+								// reset
+								currentSequence = pendingSequences.pollFirst();
+								if (currentSequence != null) {
+									currentSequence.sequence.open();
+								}
+								
+								currentNumRecordAndEvents = 0;
+								currentNumEvents = 0;
+							}
 						}
 					}
 	
-					// reset and create reader
+					// done generating a sequence. queue it for consumption
 					bufferRnd.setSeed(bufferSeed);
 					BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
 					
 					SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
-					consumer.queue(stc);
+					
+					if (currentSequence == null) {
+						currentSequence = stc;
+						stc.sequence.open();
+					}
+					else {
+						pendingSequences.addLast(stc);
+					}
 				}
 			}
 			
-			// wait for the consumer
-			consumer.join(180000);
-			assertFalse("sequence consumer did not finish its work in time", consumer.isAlive());
-			
-			// validate there was no error in the consumer
-			if (error.get() != null) {
-				Throwable t = error.get();
-				if (t instanceof Error) {
-					throw (Error) t;
+			// consume all the remainder
+			while (currentSequence != null) {
+				// consume a record
+				BufferOrEvent next = currentSequence.sequence.getNext();
+				assertNotNull(next);
+				if (next.isEvent()) {
+					BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
+					assertEquals(expected.getEvent(), next.getEvent());
+					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
 				}
 				else {
-					throw new Exception("Error while consuming the spilled records", t);
+					Random validationRnd = currentSequence.bufferRnd;
+					validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
+				}
+
+				currentNumRecordAndEvents++;
+				if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
+					// done with the sequence
+					currentSequence.sequence.cleanup();
+					sequencesConsumed++;
+
+					// validate we had all events
+					assertEquals(currentSequence.events.size(), currentNumEvents);
+
+					// reset
+					currentSequence = pendingSequences.pollFirst();
+					if (currentSequence != null) {
+						currentSequence.sequence.open();
+					}
+
+					currentNumRecordAndEvents = 0;
+					currentNumEvents = 0;
 				}
 			}
+			
+			assertEquals(sequences, sequencesConsumed);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -300,10 +373,17 @@ public class BufferSpillerTest {
 			}
 		}
 	}
-	
-	// ------------------------------------------------------------------------
-	//  Async Consumer
-	// ------------------------------------------------------------------------
+
+	private static void checkNoTempFilesRemain() {
+		// validate that all temp files have been removed
+		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+			for (String file : dir.list()) {
+				if (file != null && !(file.equals(".") || file.equals(".."))) {
+					fail("barrier buffer did not clean up temp files. remaining file: " + file);
+				}
+			}
+		}
+	}
 	
 	private static class SequenceToConsume {
 
@@ -323,81 +403,4 @@ public class BufferSpillerTest {
 			this.numChannels = numChannels;
 		}
 	}
-	
-	private static class SequenceConsumer extends Thread {
-		
-		private final AtomicReference<Throwable> error;
-		private final BlockingQueue<SequenceToConsume> sequences;
-		
-		private final int numSequencesToConsume;
-		
-		private int consumedSequences;
-
-		private SequenceConsumer(AtomicReference<Throwable> error, int numSequencesToConsume) {
-			super("Sequence Consumer");
-			setDaemon(true);
-			
-			this.error = error;
-			this.numSequencesToConsume = numSequencesToConsume;
-			this.sequences = new LinkedBlockingQueue<SequenceToConsume>();
-		}
-
-
-		@Override
-		public void run() {
-			try {
-				while (consumedSequences < numSequencesToConsume) {
-					// get next sequence
-					SequenceToConsume nextSequence = sequences.take();
-				
-					// wait a bit, allow some stuff to queue up
-					Thread.sleep(50);
-
-					BufferSpiller.SpilledBufferOrEventSequence seq = nextSequence.sequence;
-					ArrayList<BufferOrEvent> events = nextSequence.events;
-					Random bufferRnd = nextSequence.bufferRnd;
-					int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
-					int numChannels = nextSequence.numChannels;
-
-					LOG.info("Reading sequence {}", consumedSequences);
-					
-					// consume sequence
-					seq.open();
-					
-					int numEvent = 0;
-					for (int i = 0; i < numBuffersAndEvents; i++) {
-						BufferOrEvent next = seq.getNext();
-						assertNotNull(next);
-						if (next.isEvent()) {
-							BufferOrEvent expected = events.get(numEvent++);
-							assertEquals(expected.getEvent(), next.getEvent());
-							assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-						}
-						else {
-							validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
-						}
-					}
-	
-					// no further data
-					assertNull(seq.getNext());
-	
-					// all events need to be consumed
-					assertEquals(events.size(), numEvent);
-	
-					// remove all temp files
-					seq.cleanup();
-					
-					consumedSequences++;
-				}
-				
-			}
-			catch (Throwable t) {
-				error.set(t);
-			}
-		}
-		
-		public void queue(SequenceToConsume next) {
-			sequences.add(next);
-		}
-	}
 }