You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/28 06:15:01 UTC
[flink] 01/04: [FLINK-17861][tests][hotfix] Fix
ChannelStateCheckpointWriterTest.testRecordingOffsets 1. Set writer
position in NetworkBuffer passed to ChannelStateCheckpointWriter.write 2.
Reduce state size to fit in the configured MemoryCheckpointOutputStream
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 158ca26f97c884d1a27cff5b5fe3faacb363f1ac
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu May 21 21:29:28 2020 +0200
[FLINK-17861][tests][hotfix] Fix ChannelStateCheckpointWriterTest.testRecordingOffsets
1. Set writer position in NetworkBuffer passed to ChannelStateCheckpointWriter.write
2. Reduce state size to fit in the configured MemoryCheckpointOutputStream
---
.../checkpoint/channel/ChannelStateCheckpointWriterTest.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index e0cf26a..e922229 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.checkpoint.channel;
import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
@@ -42,6 +44,7 @@ import java.util.Random;
import static org.apache.flink.core.fs.Path.fromLocalFile;
import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -147,7 +150,7 @@ public class ChannelStateCheckpointWriterTest {
Map<InputChannelInfo, Integer> offsetCounts = new HashMap<>();
offsetCounts.put(new InputChannelInfo(1, 1), 1);
offsetCounts.put(new InputChannelInfo(1, 2), 2);
- offsetCounts.put(new InputChannelInfo(1, 3), 99);
+ offsetCounts.put(new InputChannelInfo(1, 3), 5);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateCheckpointWriter writer = createWriter(result);
@@ -172,8 +175,8 @@ public class ChannelStateCheckpointWriterTest {
}
private void write(ChannelStateCheckpointWriter writer, InputChannelInfo channelInfo, byte[] data) throws Exception {
- NetworkBuffer buffer = new NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length, null), FreeingBufferRecycler.INSTANCE);
- buffer.setBytes(0, data);
+ MemorySegment segment = wrap(data);
+ NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size());
writer.writeInput(channelInfo, buffer);
}