You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:20 UTC

[flink] 11/16: [FLINK-12777][network] Introduce LinkedBufferStorage class

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76b299359fcd6463d1e7c46fc64bfd42f787daef
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 18 15:57:02 2019 +0200

    [FLINK-12777][network] Introduce LinkedBufferStorage class
---
 .../streaming/runtime/io/LinkedBufferStorage.java  |  92 ++++++++++++
 .../runtime/io/BufferStorageTestBase.java          |   4 +
 .../runtime/io/LinkedBufferStorageTest.java        | 165 +++++++++++++++++++++
 3 files changed, 261 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
new file mode 100644
index 0000000..aac2ba6
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Implementation of {@link BufferStorage} that links two {@link BufferStorage} together.
+ * Each of the linked {@link BufferStorage} will store buffers independently, but they will be
+ * linked together for {@link #rollOver()} - if one is rolled over, other will do that as well.
+ *
+ * <p>Note that only {@code mainStorage} is closed when {@link LinkedBufferStorage} instance is closed.
+ */
+public class LinkedBufferStorage implements BufferStorage {
+
+	private final BufferStorage mainStorage;
+
+	private final BufferStorage linkedStorage;
+
+	private long maxBufferedBytes;
+
+	public LinkedBufferStorage(BufferStorage mainStorage, BufferStorage linkedStorage, long maxBufferedBytes) {
+		this.mainStorage = mainStorage;
+		this.linkedStorage = linkedStorage;
+		this.maxBufferedBytes = maxBufferedBytes;
+	}
+
+	@Override
+	public void add(BufferOrEvent boe) throws IOException {
+		mainStorage.add(boe);
+	}
+
+	@Override
+	public boolean isFull() {
+		return maxBufferedBytes > 0 && (getRolledBytes() + getPendingBytes()) > maxBufferedBytes;
+	}
+
+	@Override
+	public void rollOver() throws IOException {
+		mainStorage.rollOver();
+		linkedStorage.rollOver();
+	}
+
+	@Override
+	public long getPendingBytes() {
+		return mainStorage.getPendingBytes() + linkedStorage.getPendingBytes();
+	}
+
+	@Override
+	public long getRolledBytes() {
+		return mainStorage.getRolledBytes() + linkedStorage.getRolledBytes();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return mainStorage.isEmpty();
+	}
+
+	@Override
+	public Optional<BufferOrEvent> pollNext() throws IOException {
+		return mainStorage.pollNext();
+	}
+
+	@Override
+	public long getMaxBufferedBytes() {
+		return maxBufferedBytes;
+	}
+
+	@Override
+	public void close() throws IOException {
+		mainStorage.close();
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index b23d3e9..1e219a5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -244,6 +244,10 @@ public abstract class BufferStorageTestBase {
 		return new BufferOrEvent(evt, channelIndex);
 	}
 
+	public static BufferOrEvent generateRandomBuffer(int size) {
+		return generateRandomBuffer(size, 0);
+	}
+
 	public static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
 		MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
 		for (int i = 0; i < size; i++) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
new file mode 100644
index 0000000..1edae8d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Optional;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.flink.streaming.runtime.io.BufferStorageTestBase.generateRandomBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link LinkedBufferStorage}.
+ */
+public class LinkedBufferStorageTest {
+	private static final int PAGE_SIZE = 100;
+
+	private CachedBufferStorage mainStorage;
+
+	private CachedBufferStorage linkedStorage;
+
+	private LinkedBufferStorage bufferStorage;
+
+	@Before
+	public void setUp() {
+		mainStorage = new CachedBufferStorage(PAGE_SIZE);
+		linkedStorage = new CachedBufferStorage(PAGE_SIZE);
+		bufferStorage = new LinkedBufferStorage(
+			mainStorage,
+			linkedStorage,
+			700);
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		bufferStorage.close();
+		mainStorage.close();
+		linkedStorage.close();
+	}
+
+	@Test
+	public void testBasicUsage() throws IOException {
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE + 0));
+		assertEquals(PAGE_SIZE, bufferStorage.getPendingBytes());
+		assertTrue(bufferStorage.isEmpty());
+
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 1));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 2));
+
+		assertTrue(bufferStorage.isEmpty());
+		assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+		assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+		assertTrue(bufferStorage.isEmpty());
+		assertTrue(linkedStorage.isEmpty());
+
+		bufferStorage.rollOver();
+
+		assertFalse(bufferStorage.isEmpty());
+		assertFalse(linkedStorage.isEmpty());
+
+		assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+		assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE + 3));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 4));
+
+		assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+		assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+		bufferStorage.rollOver();
+
+		assertEquals(mainStorage.getPendingBytes() + linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+		assertEquals(mainStorage.getRolledBytes() + linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+		ArrayList<Integer> bufferSizes = drain(bufferStorage);
+
+		assertEquals(PAGE_SIZE + 4, (long) bufferSizes.get(0));
+		assertEquals(PAGE_SIZE + 1, (long) bufferSizes.get(1));
+		assertEquals(PAGE_SIZE + 2, (long) bufferSizes.get(2));
+
+		bufferSizes = drain(linkedStorage);
+
+		assertEquals(PAGE_SIZE + 3, (long) bufferSizes.get(0));
+		assertEquals(PAGE_SIZE + 0, (long) bufferSizes.get(1));
+
+		assertEquals(0, bufferStorage.getRolledBytes());
+		assertEquals(0, bufferStorage.getPendingBytes());
+	}
+
+	@Test
+	public void testPendingIsFull() throws IOException {
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+		assertFalse(bufferStorage.isFull());
+
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+		assertTrue(bufferStorage.isFull());
+	}
+
+	/**
+	 * This test is broken because of FLINK-12912.
+	 * https://issues.apache.org/jira/browse/FLINK-12912
+	 */
+	//@Test
+	public void testRolledIsFull() throws IOException {
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.rollOver();
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.rollOver();
+		linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+		assertFalse(bufferStorage.isFull());
+
+		bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+		assertTrue(bufferStorage.isFull());
+	}
+
+	private ArrayList<Integer> drain(BufferStorage bufferStorage) throws IOException {
+		ArrayList<Integer> result = new ArrayList<>();
+		while (!bufferStorage.isEmpty()) {
+			Optional<BufferOrEvent> bufferOrEvent = bufferStorage.pollNext();
+			if (bufferOrEvent.isPresent()) {
+				result.add(bufferOrEvent.get().getSize());
+			}
+		}
+		return result;
+	}
+}