You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 08:25:50 UTC

[flink] 03/04: [hotfix][network] Remove getMemorySegmentSize method from BufferProvider interface

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 40048355b5a7ec442b321a4ed4c84a4ab791d74a
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Jun 6 15:57:19 2019 +0800

    [hotfix][network] Remove getMemorySegmentSize method from BufferProvider interface
---
 .../runtime/io/network/buffer/BufferProvider.java  |  7 --
 .../runtime/io/network/buffer/LocalBufferPool.java |  5 --
 .../runtime/io/network/util/TestBufferFactory.java |  4 --
 .../io/network/util/TestPooledBufferProvider.java  |  5 --
 .../consumer/StreamTestSingleInputGate.java        | 82 ----------------------
 5 files changed, 103 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 843a2f6..f652d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -63,11 +63,4 @@ public interface BufferProvider {
 	 * Returns whether the buffer provider has been destroyed.
 	 */
 	boolean isDestroyed();
-
-	/**
-	 * Returns the size of the underlying memory segments. This is the maximum size a {@link Buffer}
-	 * instance can have.
-	 */
-	int getMemorySegmentSize();
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index dfad5b9..e65b6f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -165,11 +165,6 @@ class LocalBufferPool implements BufferPool {
 	}
 
 	@Override
-	public int getMemorySegmentSize() {
-		return networkBufferPool.getMemorySegmentSize();
-	}
-
-	@Override
 	public int getNumberOfRequiredMemorySegments() {
 		return numberOfRequiredMemorySegments;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 406b81f..f6b2fa8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -64,10 +64,6 @@ public class TestBufferFactory {
 		return numberOfCreatedBuffers;
 	}
 
-	public synchronized int getBufferSize() {
-		return bufferSize;
-	}
-
 	// ------------------------------------------------------------------------
 	// Static test helpers
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index eba5912..bae0830 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -96,11 +96,6 @@ public class TestPooledBufferProvider implements BufferProvider {
 		return false;
 	}
 
-	@Override
-	public int getMemorySegmentSize() {
-		return bufferFactory.getBufferSize();
-	}
-
 	public int getNumberOfAvailableBuffers() {
 		return buffers.size();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 0ca68a9..291e15c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,16 +21,12 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferListener;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -79,7 +75,6 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 		inputQueues = new ConcurrentLinkedQueue[numInputChannels];
 
 		setupInputChannels();
-		inputGate.setBufferPool(new NoOpBufferPool(bufferSize));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -226,81 +221,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			return isEvent;
 		}
 	}
-
-	private static class NoOpBufferPool implements BufferPool {
-		private int bufferSize;
-
-		public NoOpBufferPool(int bufferSize) {
-			this.bufferSize = bufferSize;
-		}
-
-		@Override
-		public void lazyDestroy() {
-		}
-
-		@Override
-		public int getMemorySegmentSize() {
-			return bufferSize;
-		}
-
-		@Override
-		public Buffer requestBuffer() throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Buffer requestBufferBlocking() throws IOException, InterruptedException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public boolean addBufferListener(BufferListener listener) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public boolean isDestroyed() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getNumberOfRequiredMemorySegments() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getMaxNumberOfMemorySegments() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getNumBuffers() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void setNumBuffers(int numBuffers) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int getNumberOfAvailableMemorySegments() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int bestEffortGetNumOfUsedBuffers() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void recycle(MemorySegment memorySegment) {
-			throw new UnsupportedOperationException();
-		}
-	}
 }