You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/28 06:15:01 UTC

[flink] 01/04: [FLINK-17861][tests][hotfix] Fix ChannelStateCheckpointWriterTest.testRecordingOffsets 1. Set writer position in NetworkBuffer passed to ChannelStateCheckpointWriter.write 2. Reduce state size to fit in the configured MemoryCheckpointOutputStream

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 158ca26f97c884d1a27cff5b5fe3faacb363f1ac
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 21 21:29:28 2020 +0200

    [FLINK-17861][tests][hotfix] Fix ChannelStateCheckpointWriterTest.testRecordingOffsets
    1. Set writer position in NetworkBuffer passed to ChannelStateCheckpointWriter.write
    2. Reduce state size to fit in the configured MemoryCheckpointOutputStream
---
 .../checkpoint/channel/ChannelStateCheckpointWriterTest.java     | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index e0cf26a..e922229 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -18,7 +18,9 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
@@ -42,6 +44,7 @@ import java.util.Random;
 
 import static org.apache.flink.core.fs.Path.fromLocalFile;
 import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -147,7 +150,7 @@ public class ChannelStateCheckpointWriterTest {
 		Map<InputChannelInfo, Integer> offsetCounts = new HashMap<>();
 		offsetCounts.put(new InputChannelInfo(1, 1), 1);
 		offsetCounts.put(new InputChannelInfo(1, 2), 2);
-		offsetCounts.put(new InputChannelInfo(1, 3), 99);
+		offsetCounts.put(new InputChannelInfo(1, 3), 5);
 
 		ChannelStateWriteResult result = new ChannelStateWriteResult();
 		ChannelStateCheckpointWriter writer = createWriter(result);
@@ -172,8 +175,8 @@ public class ChannelStateCheckpointWriterTest {
 	}
 
 	private void write(ChannelStateCheckpointWriter writer, InputChannelInfo channelInfo, byte[] data) throws Exception {
-		NetworkBuffer buffer = new NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length, null), FreeingBufferRecycler.INSTANCE);
-		buffer.setBytes(0, data);
+		MemorySegment segment = wrap(data);
+		NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
 		writer.writeInput(channelInfo, buffer);
 	}