You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 08:25:50 UTC
[flink] 03/04: [hotfix][network] Remove getMemorySegmentSize method
from BufferProvider interface
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 40048355b5a7ec442b321a4ed4c84a4ab791d74a
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 6 15:57:19 2019 +0800
[hotfix][network] Remove getMemorySegmentSize method from BufferProvider interface
---
.../runtime/io/network/buffer/BufferProvider.java | 7 --
.../runtime/io/network/buffer/LocalBufferPool.java | 5 --
.../runtime/io/network/util/TestBufferFactory.java | 4 --
.../io/network/util/TestPooledBufferProvider.java | 5 --
.../consumer/StreamTestSingleInputGate.java | 82 ----------------------
5 files changed, 103 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 843a2f6..f652d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -63,11 +63,4 @@ public interface BufferProvider {
* Returns whether the buffer provider has been destroyed.
*/
boolean isDestroyed();
-
- /**
- * Returns the size of the underlying memory segments. This is the maximum size a {@link Buffer}
- * instance can have.
- */
- int getMemorySegmentSize();
-
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index dfad5b9..e65b6f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -165,11 +165,6 @@ class LocalBufferPool implements BufferPool {
}
@Override
- public int getMemorySegmentSize() {
- return networkBufferPool.getMemorySegmentSize();
- }
-
- @Override
public int getNumberOfRequiredMemorySegments() {
return numberOfRequiredMemorySegments;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 406b81f..f6b2fa8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -64,10 +64,6 @@ public class TestBufferFactory {
return numberOfCreatedBuffers;
}
- public synchronized int getBufferSize() {
- return bufferSize;
- }
-
// ------------------------------------------------------------------------
// Static test helpers
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index eba5912..bae0830 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -96,11 +96,6 @@ public class TestPooledBufferProvider implements BufferProvider {
return false;
}
- @Override
- public int getMemorySegmentSize() {
- return bufferFactory.getBufferSize();
- }
-
public int getNumberOfAvailableBuffers() {
return buffers.size();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 0ca68a9..291e15c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,16 +21,12 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferListener;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -79,7 +75,6 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues = new ConcurrentLinkedQueue[numInputChannels];
setupInputChannels();
- inputGate.setBufferPool(new NoOpBufferPool(bufferSize));
}
@SuppressWarnings("unchecked")
@@ -226,81 +221,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
return isEvent;
}
}
-
- private static class NoOpBufferPool implements BufferPool {
- private int bufferSize;
-
- public NoOpBufferPool(int bufferSize) {
- this.bufferSize = bufferSize;
- }
-
- @Override
- public void lazyDestroy() {
- }
-
- @Override
- public int getMemorySegmentSize() {
- return bufferSize;
- }
-
- @Override
- public Buffer requestBuffer() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Buffer requestBufferBlocking() throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean addBufferListener(BufferListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isDestroyed() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getNumberOfRequiredMemorySegments() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getMaxNumberOfMemorySegments() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getNumBuffers() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setNumBuffers(int numBuffers) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getNumberOfAvailableMemorySegments() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int bestEffortGetNumOfUsedBuffers() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- throw new UnsupportedOperationException();
- }
- }
}