You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:10 UTC
[flink] 01/16: [hotfix][network] Rename BufferBlocker to
BufferStorage
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff8a94612c2e80189f9ba883e58c8fd599d4bb45
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 13 13:28:13 2019 +0200
[hotfix][network] Rename BufferBlocker to BufferStorage
---
.../flink/streaming/runtime/io/BarrierBuffer.java | 24 +++++++++----------
.../flink/streaming/runtime/io/BufferSpiller.java | 6 ++---
.../io/{BufferBlocker.java => BufferStorage.java} | 10 ++++----
...BufferBlocker.java => CachedBufferStorage.java} | 18 +++++++-------
.../streaming/runtime/io/InputProcessorUtil.java | 2 +-
.../runtime/io/BarrierBufferTestBase.java | 2 +-
.../streaming/runtime/io/BufferSpillerTest.java | 4 ++--
...kerTestBase.java => BufferStorageTestBase.java} | 28 +++++++++++-----------
...ockerTest.java => CachedBufferStorageTest.java} | 20 ++++++++--------
.../runtime/io/CreditBasedBarrierBufferTest.java | 4 ++--
10 files changed, 60 insertions(+), 58 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 63fa1ac..ad62360 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -64,7 +64,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private final int totalNumberOfInputChannels;
/** To utility to write blocked data to a file channel. */
- private final BufferBlocker bufferBlocker;
+ private final BufferStorage bufferStorage;
/**
* The pending blocked buffer/event sequences. Must be consumed before requesting further data
@@ -123,11 +123,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* <p>There is no limit to how much data may be buffered during an alignment.
*
* @param inputGate The input gate to draw the buffers and events from.
- * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
+ * @param bufferStorage The storage to hold the buffers and events for blocked channels.
*/
@VisibleForTesting
- BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) {
- this (inputGate, bufferBlocker, -1, "Testing: No task associated");
+ BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
+ this (inputGate, bufferStorage, -1, "Testing: No task associated");
}
/**
@@ -138,11 +138,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* checkpoint has been cancelled.
*
* @param inputGate The input gate to draw the buffers and events from.
- * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
+ * @param bufferStorage The storage to hold the buffers and events for blocked channels.
* @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
* @param taskName The task name for logging.
*/
- BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes, String taskName) {
+ BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long maxBufferedBytes, String taskName) {
checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
this.inputGate = inputGate;
@@ -150,7 +150,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
- this.bufferBlocker = checkNotNull(bufferBlocker);
+ this.bufferStorage = checkNotNull(bufferStorage);
this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
this.taskName = taskName;
@@ -192,7 +192,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
// if the channel is blocked, we just store the BufferOrEvent
- bufferBlocker.add(bufferOrEvent);
+ bufferStorage.add(bufferOrEvent);
checkSizeLimit();
}
else if (bufferOrEvent.isBuffer()) {
@@ -436,7 +436,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
private void checkSizeLimit() throws Exception {
- if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
+ if (maxBufferedBytes > 0 && (numQueuedBytes + bufferStorage.getBytesBlocked()) > maxBufferedBytes) {
// exceeded our limit - abort this checkpoint
LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
taskName,
@@ -473,7 +473,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
@Override
public void cleanup() throws IOException {
- bufferBlocker.close();
+ bufferStorage.close();
if (currentBuffered != null) {
currentBuffered.cleanup();
}
@@ -538,7 +538,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (currentBuffered == null) {
// common case: no more buffered data
- currentBuffered = bufferBlocker.rollOverReusingResources();
+ currentBuffered = bufferStorage.rollOverReusingResources();
if (currentBuffered != null) {
currentBuffered.open();
}
@@ -550,7 +550,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
"Pushing back current alignment buffers and feeding back new alignment data first.", taskName);
// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
- BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
+ BufferOrEventSequence bufferedNow = bufferStorage.rollOverWithoutReusingResources();
if (bufferedNow != null) {
bufferedNow.open();
queuedBuffered.addFirst(currentBuffered);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5a7c496..59877a0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file.
+ * The {@link BufferSpiller} takes the buffers and events from a data stream and adds them to a spill file.
* After a number of elements have been spilled, the spiller can "roll over": It presents the spilled
* elements as a readable sequence, and opens a new spill file.
*
@@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
@Internal
@Deprecated
-public class BufferSpiller implements BufferBlocker {
+public class BufferSpiller implements BufferStorage {
/** Size of header in bytes (see add method). */
static final int HEADER_SIZE = 9;
@@ -92,7 +92,7 @@ public class BufferSpiller implements BufferBlocker {
private long bytesWritten;
/**
- * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
+ * Creates a new {@link BufferSpiller}, spilling to one of the I/O manager's temp directories.
*
* @param ioManager The I/O manager for access to the temp directories.
* @param pageSize The page size used to re-create spilled buffers.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
similarity index 84%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
index 4d0f66f..7d4dff0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
@@ -24,15 +24,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import java.io.IOException;
/**
- * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence.
- * After a number of elements have been added, the blocker can "roll over": It presents the added
- * elements as a readable sequence, and creates a new sequence.
+ * The {@link BufferStorage} takes the buffers and events from a data stream and adds them in a sequence.
+ * After a number of elements have been added, the {@link BufferStorage} can "roll over":
+ * It presents the added elements as a readable sequence, and creates a new sequence.
*/
@Internal
-public interface BufferBlocker {
+public interface BufferStorage {
/**
- * Adds a buffer or event to the blocker.
+ * Adds a buffer or event to the {@link BufferStorage}.
*
* @param boe The buffer or event to be added into the blocker.
*/
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
similarity index 84%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
index f91e8cc..e0a79c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
@@ -25,14 +25,16 @@ import javax.annotation.Nullable;
import java.util.ArrayDeque;
/**
- * The cached buffer blocker takes the buffers and events from a data stream and adds them to a memory queue.
- * After a number of elements have been cached, the blocker can "roll over": It presents the cached
- * elements as a readable sequence, and creates a new memory queue.
+ * The {@link CachedBufferStorage} takes the buffers and events from a data stream and adds them to
+ * a memory queue. After a number of elements have been cached, the {@link CachedBufferStorage}
+ * can "roll over":
+ * It presents the cached elements as a readable sequence, and creates a new memory queue.
*
- * <p>This buffer blocked can be used in credit-based flow control for better barrier alignment in exactly-once mode.
+ * <p>This {@link CachedBufferStorage} can be used in credit-based flow control for better barrier
+ * alignment in exactly-once mode.
*/
@Internal
-public class CachedBufferBlocker implements BufferBlocker {
+public class CachedBufferStorage implements BufferStorage {
/** The page size, to estimate the total cached data size. */
private final int pageSize;
@@ -44,11 +46,11 @@ public class CachedBufferBlocker implements BufferBlocker {
private ArrayDeque<BufferOrEvent> currentBuffers;
/**
- * Creates a new buffer blocker, caching the buffers or events in memory queue.
+ * Creates a new {@link CachedBufferStorage}, caching the buffers or events in memory queue.
*
* @param pageSize The page size used to estimate the cached size.
*/
- public CachedBufferBlocker(int pageSize) {
+ public CachedBufferStorage(int pageSize) {
this.pageSize = pageSize;
this.currentBuffers = new ArrayDeque<BufferOrEvent>();
}
@@ -100,7 +102,7 @@ public class CachedBufferBlocker implements BufferBlocker {
/**
* This class represents a sequence of cached buffers and events, created by the
- * {@link CachedBufferBlocker}.
+ * {@link CachedBufferStorage}.
*/
public static class CachedBufferOrEventSequence implements BufferOrEventSequence {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index b77c7d0..289dd1a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -56,7 +56,7 @@ public class InputProcessorUtil {
if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(
inputGate,
- new CachedBufferBlocker(inputGate.getPageSize()),
+ new CachedBufferStorage(inputGate.getPageSize()),
maxAlign,
taskName);
} else {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index c9981b5..4bc05ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -58,7 +58,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
/**
- * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferBlocker} implements.
+ * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferStorage} implements.
*/
public abstract class BarrierBufferTestBase {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index b70ba24..4633154 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
/**
* Tests for {@link BufferSpiller}.
*/
-public class BufferSpillerTest extends BufferBlockerTestBase {
+public class BufferSpillerTest extends BufferStorageTestBase {
private static IOManager ioManager;
@@ -76,7 +76,7 @@ public class BufferSpillerTest extends BufferBlockerTestBase {
}
@Override
- public BufferBlocker createBufferBlocker() {
+ public BufferStorage createBufferStorage() {
return spiller;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
similarity index 93%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index 4533a65..0485d88 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -39,20 +39,20 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
- * Tests for {@link BufferBlocker}.
+ * Tests for {@link BufferStorage}.
*/
-public abstract class BufferBlockerTestBase {
+public abstract class BufferStorageTestBase {
protected static final int PAGE_SIZE = 4096;
- abstract BufferBlocker createBufferBlocker();
+ abstract BufferStorage createBufferStorage();
@Test
public void testRollOverEmptySequences() throws IOException {
- BufferBlocker bufferBlocker = createBufferBlocker();
- assertNull(bufferBlocker.rollOverReusingResources());
- assertNull(bufferBlocker.rollOverReusingResources());
- assertNull(bufferBlocker.rollOverReusingResources());
+ BufferStorage bufferStorage = createBufferStorage();
+ assertNull(bufferStorage.rollOverReusingResources());
+ assertNull(bufferStorage.rollOverReusingResources());
+ assertNull(bufferStorage.rollOverReusingResources());
}
@Test
@@ -63,7 +63,7 @@ public abstract class BufferBlockerTestBase {
final int maxNumEventsAndBuffers = 3000;
final int maxNumChannels = 1656;
- BufferBlocker bufferBlocker = createBufferBlocker();
+ BufferStorage bufferStorage = createBufferStorage();
// do multiple spilling / rolling over rounds
for (int round = 0; round < 5; round++) {
@@ -86,13 +86,13 @@ public abstract class BufferBlockerTestBase {
} else {
evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
}
- bufferBlocker.add(evt);
+ bufferStorage.add(evt);
}
// reset and create reader
bufferRnd.setSeed(bufferSeed);
- BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources();
+ BufferOrEventSequence seq = bufferStorage.rollOverReusingResources();
seq.open();
// read and validate the sequence
@@ -136,14 +136,14 @@ public abstract class BufferBlockerTestBase {
int currentNumEvents = 0;
int currentNumRecordAndEvents = 0;
- BufferBlocker bufferBlocker = createBufferBlocker();
+ BufferStorage bufferStorage = createBufferStorage();
// do multiple spilling / rolling over rounds
for (int round = 0; round < 2 * sequences; round++) {
if (round % 2 == 1) {
// make this an empty sequence
- assertNull(bufferBlocker.rollOverReusingResources());
+ assertNull(bufferStorage.rollOverReusingResources());
} else {
// proper spilled sequence
final long bufferSeed = rnd.nextLong();
@@ -167,7 +167,7 @@ public abstract class BufferBlockerTestBase {
} else {
evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels));
}
- bufferBlocker.add(evt);
+ bufferStorage.add(evt);
generated++;
} else {
// consume a record
@@ -205,7 +205,7 @@ public abstract class BufferBlockerTestBase {
// done generating a sequence. queue it for consumption
bufferRnd.setSeed(bufferSeed);
- BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources();
+ BufferOrEventSequence seq = bufferStorage.rollOverReusingResources();
SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numberOfChannels);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
similarity index 73%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
index e7bf128..d1e2cf4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
@@ -24,30 +24,30 @@ import org.junit.Before;
import java.io.IOException;
/**
- * Tests for {@link CachedBufferBlocker}.
+ * Tests for {@link CachedBufferStorage}.
*/
-public class CachedBufferBlockerTest extends BufferBlockerTestBase {
+public class CachedBufferStorageTest extends BufferStorageTestBase {
- private CachedBufferBlocker bufferBlocker;
+ private CachedBufferStorage bufferStorage;
// ------------------------------------------------------------------------
// Setup / Cleanup
// ------------------------------------------------------------------------
@Before
- public void createBlocker() {
- bufferBlocker = new CachedBufferBlocker(PAGE_SIZE);
+ public void createStorage() {
+ bufferStorage = new CachedBufferStorage(PAGE_SIZE);
}
@After
- public void cleanupBlocker() throws IOException {
- if (bufferBlocker != null) {
- bufferBlocker.close();
+ public void cleanupStorage() throws IOException {
+ if (bufferStorage != null) {
+ bufferStorage.close();
}
}
@Override
- public BufferBlocker createBufferBlocker() {
- return bufferBlocker;
+ public BufferStorage createBufferStorage() {
+ return bufferStorage;
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index e9c87ed..da88ffb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -26,13 +26,13 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
/**
- * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferBlocker}.
+ * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferStorage}.
*/
public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
@Override
public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
- return new BarrierBuffer(gate, new CachedBufferBlocker(PAGE_SIZE));
+ return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE));
}
@Override