You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:10 UTC

[flink] 01/16: [hotfix][network] Rename BufferBlocker to BufferStorage

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff8a94612c2e80189f9ba883e58c8fd599d4bb45
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 13 13:28:13 2019 +0200

    [hotfix][network] Rename BufferBlocker to BufferStorage
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 24 +++++++++----------
 .../flink/streaming/runtime/io/BufferSpiller.java  |  6 ++---
 .../io/{BufferBlocker.java => BufferStorage.java}  | 10 ++++----
 ...BufferBlocker.java => CachedBufferStorage.java} | 18 +++++++-------
 .../streaming/runtime/io/InputProcessorUtil.java   |  2 +-
 .../runtime/io/BarrierBufferTestBase.java          |  2 +-
 .../streaming/runtime/io/BufferSpillerTest.java    |  4 ++--
 ...kerTestBase.java => BufferStorageTestBase.java} | 28 +++++++++++-----------
 ...ockerTest.java => CachedBufferStorageTest.java} | 20 ++++++++--------
 .../runtime/io/CreditBasedBarrierBufferTest.java   |  4 ++--
 10 files changed, 60 insertions(+), 58 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 63fa1ac..ad62360 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -64,7 +64,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private final int totalNumberOfInputChannels;
 
 	/** To utility to write blocked data to a file channel. */
-	private final BufferBlocker bufferBlocker;
+	private final BufferStorage bufferStorage;
 
 	/**
 	 * The pending blocked buffer/event sequences. Must be consumed before requesting further data
@@ -123,11 +123,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * <p>There is no limit to how much data may be buffered during an alignment.
 	 *
 	 * @param inputGate The input gate to draw the buffers and events from.
-	 * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
+	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
 	 */
 	@VisibleForTesting
-	BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) {
-		this (inputGate, bufferBlocker, -1, "Testing: No task associated");
+	BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
+		this (inputGate, bufferStorage, -1, "Testing: No task associated");
 	}
 
 	/**
@@ -138,11 +138,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * checkpoint has been cancelled.
 	 *
 	 * @param inputGate The input gate to draw the buffers and events from.
-	 * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
+	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
 	 * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
 	 * @param taskName The task name for logging.
 	 */
-	BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes, String taskName) {
+	BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long maxBufferedBytes, String taskName) {
 		checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
 
 		this.inputGate = inputGate;
@@ -150,7 +150,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
 
-		this.bufferBlocker = checkNotNull(bufferBlocker);
+		this.bufferStorage = checkNotNull(bufferStorage);
 		this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
 
 		this.taskName = taskName;
@@ -192,7 +192,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			BufferOrEvent bufferOrEvent = next.get();
 			if (isBlocked(bufferOrEvent.getChannelIndex())) {
 				// if the channel is blocked, we just store the BufferOrEvent
-				bufferBlocker.add(bufferOrEvent);
+				bufferStorage.add(bufferOrEvent);
 				checkSizeLimit();
 			}
 			else if (bufferOrEvent.isBuffer()) {
@@ -436,7 +436,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	private void checkSizeLimit() throws Exception {
-		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
+		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferStorage.getBytesBlocked()) > maxBufferedBytes) {
 			// exceeded our limit - abort this checkpoint
 			LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
 				taskName,
@@ -473,7 +473,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	@Override
 	public void cleanup() throws IOException {
-		bufferBlocker.close();
+		bufferStorage.close();
 		if (currentBuffered != null) {
 			currentBuffered.cleanup();
 		}
@@ -538,7 +538,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 		if (currentBuffered == null) {
 			// common case: no more buffered data
-			currentBuffered = bufferBlocker.rollOverReusingResources();
+			currentBuffered = bufferStorage.rollOverReusingResources();
 			if (currentBuffered != null) {
 				currentBuffered.open();
 			}
@@ -550,7 +550,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					"Pushing back current alignment buffers and feeding back new alignment data first.", taskName);
 
 			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
-			BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
+			BufferOrEventSequence bufferedNow = bufferStorage.rollOverWithoutReusingResources();
 			if (bufferedNow != null) {
 				bufferedNow.open();
 				queuedBuffered.addFirst(currentBuffered);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5a7c496..59877a0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file.
+ * The {@link BufferSpiller} takes the buffers and events from a data stream and adds them to a spill file.
  * After a number of elements have been spilled, the spiller can "roll over": It presents the spilled
  * elements as a readable sequence, and opens a new spill file.
  *
@@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 @Internal
 @Deprecated
-public class BufferSpiller implements BufferBlocker {
+public class BufferSpiller implements BufferStorage {
 
 	/** Size of header in bytes (see add method). */
 	static final int HEADER_SIZE = 9;
@@ -92,7 +92,7 @@ public class BufferSpiller implements BufferBlocker {
 	private long bytesWritten;
 
 	/**
-	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
+	 * Creates a new {@link BufferSpiller}, spilling to one of the I/O manager's temp directories.
 	 *
 	 * @param ioManager The I/O manager for access to the temp directories.
 	 * @param pageSize The page size used to re-create spilled buffers.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
similarity index 84%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
index 4d0f66f..7d4dff0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
@@ -24,15 +24,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import java.io.IOException;
 
 /**
- * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence.
- * After a number of elements have been added, the blocker can "roll over": It presents the added
- * elements as a readable sequence, and creates a new sequence.
+ * The {@link BufferStorage} takes the buffers and events from a data stream and adds them in a sequence.
+ * After a number of elements have been added, the {@link BufferStorage} can "roll over":
+ * It presents the added elements as a readable sequence, and creates a new sequence.
  */
 @Internal
-public interface BufferBlocker {
+public interface BufferStorage {
 
 	/**
-	 * Adds a buffer or event to the blocker.
+	 * Adds a buffer or event to the {@link BufferStorage}.
 	 *
 	 * @param boe The buffer or event to be added into the blocker.
 	 */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
similarity index 84%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
index f91e8cc..e0a79c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
@@ -25,14 +25,16 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 
 /**
- * The cached buffer blocker takes the buffers and events from a data stream and adds them to a memory queue.
- * After a number of elements have been cached, the blocker can "roll over": It presents the cached
- * elements as a readable sequence, and creates a new memory queue.
+ * The {@link CachedBufferStorage} takes the buffers and events from a data stream and adds them to
+ * a memory queue. After a number of elements have been cached, the {@link CachedBufferStorage}
+ * can "roll over":
+ * It presents the cached elements as a readable sequence, and creates a new memory queue.
  *
- * <p>This buffer blocked can be used in credit-based flow control for better barrier alignment in exactly-once mode.
+ * <p>This {@link CachedBufferStorage} can be used in credit-based flow control for better barrier
+ * alignment in exactly-once mode.
  */
 @Internal
-public class CachedBufferBlocker implements BufferBlocker {
+public class CachedBufferStorage implements BufferStorage {
 
 	/** The page size, to estimate the total cached data size. */
 	private final int pageSize;
@@ -44,11 +46,11 @@ public class CachedBufferBlocker implements BufferBlocker {
 	private ArrayDeque<BufferOrEvent> currentBuffers;
 
 	/**
-	 * Creates a new buffer blocker, caching the buffers or events in memory queue.
+	 * Creates a new {@link CachedBufferStorage}, caching the buffers or events in memory queue.
 	 *
 	 * @param pageSize The page size used to estimate the cached size.
 	 */
-	public CachedBufferBlocker(int pageSize) {
+	public CachedBufferStorage(int pageSize) {
 		this.pageSize = pageSize;
 		this.currentBuffers = new ArrayDeque<BufferOrEvent>();
 	}
@@ -100,7 +102,7 @@ public class CachedBufferBlocker implements BufferBlocker {
 
 	/**
 	 * This class represents a sequence of cached buffers and events, created by the
-	 * {@link CachedBufferBlocker}.
+	 * {@link CachedBufferStorage}.
 	 */
 	public static class CachedBufferOrEventSequence implements BufferOrEventSequence {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index b77c7d0..289dd1a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -56,7 +56,7 @@ public class InputProcessorUtil {
 			if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
 				barrierHandler = new BarrierBuffer(
 					inputGate,
-					new CachedBufferBlocker(inputGate.getPageSize()),
+					new CachedBufferStorage(inputGate.getPageSize()),
 					maxAlign,
 					taskName);
 			} else {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index c9981b5..4bc05ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -58,7 +58,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferBlocker} implements.
+ * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferStorage} implements.
  */
 public abstract class BarrierBufferTestBase {
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index b70ba24..4633154 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link BufferSpiller}.
  */
-public class BufferSpillerTest extends BufferBlockerTestBase {
+public class BufferSpillerTest extends BufferStorageTestBase {
 
 	private static IOManager ioManager;
 
@@ -76,7 +76,7 @@ public class BufferSpillerTest extends BufferBlockerTestBase {
 	}
 
 	@Override
-	public BufferBlocker createBufferBlocker() {
+	public BufferStorage createBufferStorage() {
 		return spiller;
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
similarity index 93%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index 4533a65..0485d88 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -39,20 +39,20 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for {@link BufferBlocker}.
+ * Tests for {@link BufferStorage}.
  */
-public abstract class BufferBlockerTestBase {
+public abstract class BufferStorageTestBase {
 
 	protected static final int PAGE_SIZE = 4096;
 
-	abstract BufferBlocker createBufferBlocker();
+	abstract BufferStorage createBufferStorage();
 
 	@Test
 	public void testRollOverEmptySequences() throws IOException {
-		BufferBlocker bufferBlocker = createBufferBlocker();
-		assertNull(bufferBlocker.rollOverReusingResources());
-		assertNull(bufferBlocker.rollOverReusingResources());
-		assertNull(bufferBlocker.rollOverReusingResources());
+		BufferStorage bufferStorage = createBufferStorage();
+		assertNull(bufferStorage.rollOverReusingResources());
+		assertNull(bufferStorage.rollOverReusingResources());
+		assertNull(bufferStorage.rollOverReusingResources());
 	}
 
 	@Test
@@ -63,7 +63,7 @@ public abstract class BufferBlockerTestBase {
 		final int maxNumEventsAndBuffers = 3000;
 		final int maxNumChannels = 1656;
 
-		BufferBlocker bufferBlocker = createBufferBlocker();
+		BufferStorage bufferStorage = createBufferStorage();
 
 		// do multiple spilling / rolling over rounds
 		for (int round = 0; round < 5; round++) {
@@ -86,13 +86,13 @@ public abstract class BufferBlockerTestBase {
 				} else {
 					evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
 				}
-				bufferBlocker.add(evt);
+				bufferStorage.add(evt);
 			}
 
 			// reset and create reader
 			bufferRnd.setSeed(bufferSeed);
 
-			BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources();
+			BufferOrEventSequence seq = bufferStorage.rollOverReusingResources();
 			seq.open();
 
 			// read and validate the sequence
@@ -136,14 +136,14 @@ public abstract class BufferBlockerTestBase {
 		int currentNumEvents = 0;
 		int currentNumRecordAndEvents = 0;
 
-		BufferBlocker bufferBlocker = createBufferBlocker();
+		BufferStorage bufferStorage = createBufferStorage();
 
 		// do multiple spilling / rolling over rounds
 		for (int round = 0; round < 2 * sequences; round++) {
 
 			if (round % 2 == 1) {
 				// make this an empty sequence
-				assertNull(bufferBlocker.rollOverReusingResources());
+				assertNull(bufferStorage.rollOverReusingResources());
 			} else {
 				// proper spilled sequence
 				final long bufferSeed = rnd.nextLong();
@@ -167,7 +167,7 @@ public abstract class BufferBlockerTestBase {
 						} else {
 							evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
 						}
-						bufferBlocker.add(evt);
+						bufferStorage.add(evt);
 						generated++;
 					} else {
 						// consume a record
@@ -205,7 +205,7 @@ public abstract class BufferBlockerTestBase {
 
 				// done generating a sequence. queue it for consumption
 				bufferRnd.setSeed(bufferSeed);
-				BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources();
+				BufferOrEventSequence seq = bufferStorage.rollOverReusingResources();
 
 				SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numberOfChannels);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
similarity index 73%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
index e7bf128..d1e2cf4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
@@ -24,30 +24,30 @@ import org.junit.Before;
 import java.io.IOException;
 
 /**
- * Tests for {@link CachedBufferBlocker}.
+ * Tests for {@link CachedBufferStorage}.
  */
-public class CachedBufferBlockerTest extends BufferBlockerTestBase {
+public class CachedBufferStorageTest extends BufferStorageTestBase {
 
-	private CachedBufferBlocker bufferBlocker;
+	private CachedBufferStorage bufferStorage;
 
 	// ------------------------------------------------------------------------
 	//  Setup / Cleanup
 	// ------------------------------------------------------------------------
 
 	@Before
-	public void createBlocker() {
-		bufferBlocker = new CachedBufferBlocker(PAGE_SIZE);
+	public void createStorage() {
+		bufferStorage = new CachedBufferStorage(PAGE_SIZE);
 	}
 
 	@After
-	public void cleanupBlocker() throws IOException {
-		if (bufferBlocker != null) {
-			bufferBlocker.close();
+	public void cleanupStorage() throws IOException {
+		if (bufferStorage != null) {
+			bufferStorage.close();
 		}
 	}
 
 	@Override
-	public BufferBlocker createBufferBlocker() {
-		return  bufferBlocker;
+	public BufferStorage createBufferStorage() {
+		return  bufferStorage;
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index e9c87ed..da88ffb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -26,13 +26,13 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferBlocker}.
+ * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferStorage}.
  */
 public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
 
 	@Override
 	public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
-		return new BarrierBuffer(gate, new CachedBufferBlocker(PAGE_SIZE));
+		return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE));
 	}
 
 	@Override