You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:20 UTC
[flink] 11/16: [FLINK-12777][network] Introduce LinkedBufferStorage
class
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 76b299359fcd6463d1e7c46fc64bfd42f787daef
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 15:57:02 2019 +0200
[FLINK-12777][network] Introduce LinkedBufferStorage class
---
.../streaming/runtime/io/LinkedBufferStorage.java | 92 ++++++++++++
.../runtime/io/BufferStorageTestBase.java | 4 +
.../runtime/io/LinkedBufferStorageTest.java | 165 +++++++++++++++++++++
3 files changed, 261 insertions(+)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
new file mode 100644
index 0000000..aac2ba6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Implementation of {@link BufferStorage} that links two {@link BufferStorage} together.
+ * Each of the linked {@link BufferStorage} will store buffers independently, but they will be
+ * linked together for {@link #rollOver()} - if one is rolled over, other will do that as well.
+ *
+ * <p>Note that only {@code mainStorage} is closed when {@link LinkedBufferStorage} instance is closed.
+ */
+public class LinkedBufferStorage implements BufferStorage {
+
+ private final BufferStorage mainStorage;
+
+ private final BufferStorage linkedStorage;
+
+ private long maxBufferedBytes;
+
+ public LinkedBufferStorage(BufferStorage mainStorage, BufferStorage linkedStorage, long maxBufferedBytes) {
+ this.mainStorage = mainStorage;
+ this.linkedStorage = linkedStorage;
+ this.maxBufferedBytes = maxBufferedBytes;
+ }
+
+ @Override
+ public void add(BufferOrEvent boe) throws IOException {
+ mainStorage.add(boe);
+ }
+
+ @Override
+ public boolean isFull() {
+ return maxBufferedBytes > 0 && (getRolledBytes() + getPendingBytes()) > maxBufferedBytes;
+ }
+
+ @Override
+ public void rollOver() throws IOException {
+ mainStorage.rollOver();
+ linkedStorage.rollOver();
+ }
+
+ @Override
+ public long getPendingBytes() {
+ return mainStorage.getPendingBytes() + linkedStorage.getPendingBytes();
+ }
+
+ @Override
+ public long getRolledBytes() {
+ return mainStorage.getRolledBytes() + linkedStorage.getRolledBytes();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return mainStorage.isEmpty();
+ }
+
+ @Override
+ public Optional<BufferOrEvent> pollNext() throws IOException {
+ return mainStorage.pollNext();
+ }
+
+ @Override
+ public long getMaxBufferedBytes() {
+ return maxBufferedBytes;
+ }
+
+ @Override
+ public void close() throws IOException {
+ mainStorage.close();
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index b23d3e9..1e219a5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -244,6 +244,10 @@ public abstract class BufferStorageTestBase {
return new BufferOrEvent(evt, channelIndex);
}
+ public static BufferOrEvent generateRandomBuffer(int size) {
+ return generateRandomBuffer(size, 0);
+ }
+
public static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
for (int i = 0; i < size; i++) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
new file mode 100644
index 0000000..1edae8d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Optional;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.flink.streaming.runtime.io.BufferStorageTestBase.generateRandomBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link LinkedBufferStorage}.
+ */
+public class LinkedBufferStorageTest {
+ private static final int PAGE_SIZE = 100;
+
+ private CachedBufferStorage mainStorage;
+
+ private CachedBufferStorage linkedStorage;
+
+ private LinkedBufferStorage bufferStorage;
+
+ @Before
+ public void setUp() {
+ mainStorage = new CachedBufferStorage(PAGE_SIZE);
+ linkedStorage = new CachedBufferStorage(PAGE_SIZE);
+ bufferStorage = new LinkedBufferStorage(
+ mainStorage,
+ linkedStorage,
+ 700);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ bufferStorage.close();
+ mainStorage.close();
+ linkedStorage.close();
+ }
+
+ @Test
+ public void testBasicUsage() throws IOException {
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE + 0));
+ assertEquals(PAGE_SIZE, bufferStorage.getPendingBytes());
+ assertTrue(bufferStorage.isEmpty());
+
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 1));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 2));
+
+ assertTrue(bufferStorage.isEmpty());
+ assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+ assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+ assertTrue(bufferStorage.isEmpty());
+ assertTrue(linkedStorage.isEmpty());
+
+ bufferStorage.rollOver();
+
+ assertFalse(bufferStorage.isEmpty());
+ assertFalse(linkedStorage.isEmpty());
+
+ assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+ assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE + 3));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 4));
+
+ assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+ assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+ bufferStorage.rollOver();
+
+ assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+ assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+ ArrayList<Integer> bufferSizes = drain(bufferStorage);
+
+ assertEquals(PAGE_SIZE + 4, (long) bufferSizes.get(0));
+ assertEquals(PAGE_SIZE + 1, (long) bufferSizes.get(1));
+ assertEquals(PAGE_SIZE + 2, (long) bufferSizes.get(2));
+
+ bufferSizes = drain(linkedStorage);
+
+ assertEquals(PAGE_SIZE + 3, (long) bufferSizes.get(0));
+ assertEquals(PAGE_SIZE + 0, (long) bufferSizes.get(1));
+
+ assertEquals(0, bufferStorage.getRolledBytes());
+ assertEquals(0, bufferStorage.getPendingBytes());
+ }
+
+ @Test
+ public void testPendingIsFull() throws IOException {
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+ assertFalse(bufferStorage.isFull());
+
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+ assertTrue(bufferStorage.isFull());
+ }
+
+ /**
+ * This test is broken because of FLINK-12912.
+ * https://issues.apache.org/jira/browse/FLINK-12912
+ */
+ //@Test
+ public void testRolledIsFull() throws IOException {
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.rollOver();
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.rollOver();
+ linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+ assertFalse(bufferStorage.isFull());
+
+ bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+ assertTrue(bufferStorage.isFull());
+ }
+
+ private ArrayList<Integer> drain(BufferStorage bufferStorage) throws IOException {
+ ArrayList<Integer> result = new ArrayList<>();
+ while (!bufferStorage.isEmpty()) {
+ Optional<BufferOrEvent> bufferOrEvent = bufferStorage.pollNext();
+ if (bufferOrEvent.isPresent()) {
+ result.add(bufferOrEvent.get().getSize());
+ }
+ }
+ return result;
+ }
+}