You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/07 05:05:25 UTC
[incubator-ratis] branch master updated: RATIS-1201. Enhance
FileStore steaming tests (#320). Contributed by Rui Wang
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 365746f RATIS-1201. Enhance FileStore steaming tests (#320). Contributed by Rui Wang
365746f is described below
commit 365746f01368a8b3fe894f93831803dc9739c0e0
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Sun Dec 6 21:05:16 2020 -0800
RATIS-1201. Enhance FileStore steaming tests (#320). Contributed by Rui Wang
---
.../examples/filestore/FileStoreAsyncBaseTest.java | 34 ++-
.../examples/filestore/FileStoreBaseTest.java | 193 ++------------
.../filestore/FileStoreStreamingBaseTest.java | 101 ++++---
.../ratis/examples/filestore/FileStoreWriter.java | 290 +++++++++++++++++++++
4 files changed, 390 insertions(+), 228 deletions(-)
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
index f99d927..d70cfa6 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
@@ -22,7 +22,6 @@ import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.examples.filestore.FileStoreBaseTest.Writer;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.SizeInBytes;
@@ -72,10 +71,15 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
throws Exception {
LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
- new Writer(path, fileLength, executor, () -> client)
+ FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setAsyncExecutor(executor)
+ .setFileStoreClientSupplier(() -> client)
+ .build()
.writeAsync()
- .thenCompose(Writer::verifyAsync)
- .thenCompose(Writer::deleteAsync)
+ .thenCompose(FileStoreWriter::verifyAsync)
+ .thenCompose(FileStoreWriter::deleteAsync)
.get();
}
@@ -85,25 +89,31 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
LOG.info("runTestMultipleFile with pathPrefix={}, numFile={}, fileLength={}",
pathPrefix, numFile, fileLength);
- final List<CompletableFuture<Writer>> writerFutures = new ArrayList<>();
+ final List<CompletableFuture<FileStoreWriter>> writerFutures = new ArrayList<>();
for (int i = 0; i < numFile; i++) {
final String path = String.format("%s%02d", pathPrefix, i);
- final Callable<CompletableFuture<Writer>> callable = LogUtils.newCallable(LOG,
- () -> new Writer(path, fileLength, executor, () -> client).writeAsync(),
+ final Callable<CompletableFuture<FileStoreWriter>> callable = LogUtils.newCallable(LOG,
+ () -> FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setAsyncExecutor(executor)
+ .setFileStoreClientSupplier(() -> client)
+ .build()
+ .writeAsync(),
() -> path + ":" + fileLength);
writerFutures.add(callable.call());
}
- final List<Writer> writers = new ArrayList<>();
- for(CompletableFuture<Writer> f : writerFutures) {
+ final List<FileStoreWriter> writers = new ArrayList<>();
+ for(CompletableFuture<FileStoreWriter> f : writerFutures) {
writers.add(f.get());
}
writerFutures.clear();
- for (Writer w : writers) {
- writerFutures.add(w.verifyAsync().thenCompose(Writer::deleteAsync));
+ for (FileStoreWriter w : writers) {
+ writerFutures.add(w.verifyAsync().thenCompose(FileStoreWriter::deleteAsync));
}
- for(CompletableFuture<Writer> f : writerFutures) {
+ for(CompletableFuture<FileStoreWriter> f : writerFutures) {
f.get();
}
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index 198ea2b..cf01050 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -86,7 +86,12 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
throws Exception {
LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
- try (final Writer w = new Writer(path, fileLength, null, newClient)) {
+ try (final FileStoreWriter w =
+ FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setFileStoreClientSupplier(newClient)
+ .build()) {
w.write().verify().delete();
}
}
@@ -99,196 +104,32 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
final ExecutorService executor = Executors.newFixedThreadPool(20);
- final List<Future<Writer>> writerFutures = new ArrayList<>();
+ final List<Future<FileStoreWriter>> writerFutures = new ArrayList<>();
for (int i = 0; i < numFile; i++) {
final String path = String.format("%s%02d", pathPrefix, i);
- final Callable<Writer> callable = LogUtils.newCallable(LOG,
- () -> new Writer(path, fileLength, null, newClient).write(),
+ final Callable<FileStoreWriter> callable = LogUtils.newCallable(LOG,
+ () -> FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setFileStoreClientSupplier(newClient)
+ .build().write(),
() -> path + ":" + fileLength);
writerFutures.add(executor.submit(callable));
}
- final List<Writer> writers = new ArrayList<>();
- for(Future<Writer> f : writerFutures) {
+ final List<FileStoreWriter> writers = new ArrayList<>();
+ for(Future<FileStoreWriter> f : writerFutures) {
writers.add(f.get());
}
writerFutures.clear();
- for (Writer w : writers) {
+ for (FileStoreWriter w : writers) {
writerFutures.add(executor.submit(() -> w.verify().delete()));
}
- for(Future<Writer> f : writerFutures) {
+ for(Future<FileStoreWriter> f : writerFutures) {
f.get().close();
}
executor.shutdown();
}
-
- static class Writer implements Closeable {
- final long seed = ThreadLocalRandom.current().nextLong();
- final byte[] buffer = new byte[4 << 10];
-
- final String fileName;
- final SizeInBytes fileSize;
- final FileStoreClient client;
- final Executor asyncExecutor;
-
- Writer(String fileName, SizeInBytes fileSize, Executor asyncExecutor,
- CheckedSupplier<FileStoreClient, IOException> clientSupplier)
- throws IOException {
- this.fileName = fileName;
- this.fileSize = fileSize;
- this.client = clientSupplier.get();
- this.asyncExecutor = asyncExecutor;
- }
-
- ByteBuffer randomBytes(int length, Random random) {
- Preconditions.assertTrue(length <= buffer.length);
- random.nextBytes(buffer);
- final ByteBuffer b = ByteBuffer.wrap(buffer);
- b.limit(length);
- return b;
- }
-
- Writer write() throws IOException {
- final Random r = new Random(seed);
- final int size = fileSize.getSizeInt();
-
- for(int offset = 0; offset < size; ) {
- final int remaining = size - offset;
- final int length = Math.min(remaining, buffer.length);
- final boolean close = length == remaining;
-
- final ByteBuffer b = randomBytes(length, r);
-
- LOG.trace("write {}, offset={}, length={}, close? {}",
- fileName, offset, length, close);
- final long written = client.write(fileName, offset, close, b);
- Assert.assertEquals(length, written);
- offset += written;
- }
- return this;
- }
-
- CompletableFuture<Writer> writeAsync() {
- Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
- final Random r = new Random(seed);
- final int size = fileSize.getSizeInt();
-
- final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
- final AtomicInteger callCount = new AtomicInteger();
- final AtomicInteger n = new AtomicInteger();
- for(; n.get() < size; ) {
- final int offset = n.get();
- final int remaining = size - offset;
- final int length = Math.min(remaining, buffer.length);
- final boolean close = length == remaining;
-
- final ByteBuffer b = randomBytes(length, r);
-
- callCount.incrementAndGet();
- n.addAndGet(length);
-
- LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
- fileName, offset, length, close);
- client.writeAsync(fileName, offset, close, b)
- .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
- .thenRun(() -> {
- final int count = callCount.decrementAndGet();
- LOG.trace("writeAsync {}, offset={}, length={}, close? {}: n={}, callCount={}",
- fileName, offset, length, close, n.get(), count);
- if (n.get() == size && count == 0) {
- returnFuture.complete(this);
- }
- })
- .exceptionally(e -> {
- returnFuture.completeExceptionally(e);
- return null;
- });
- }
- return returnFuture;
- }
-
- Writer verify() throws IOException {
- final Random r = new Random(seed);
- final int size = fileSize.getSizeInt();
-
- for(int offset = 0; offset < size; ) {
- final int remaining = size - offset;
- final int n = Math.min(remaining, buffer.length);
- final ByteString read = client.read(fileName, offset, n);
- final ByteBuffer expected = randomBytes(n, r);
- verify(read, offset, n, expected);
- offset += n;
- }
- return this;
- }
-
- CompletableFuture<Writer> verifyAsync() {
- Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
- final Random r = new Random(seed);
- final int size = fileSize.getSizeInt();
-
- final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
- final AtomicInteger callCount = new AtomicInteger();
- final AtomicInteger n = new AtomicInteger();
- for(; n.get() < size; ) {
- final int offset = n.get();
- final int remaining = size - offset;
- final int length = Math.min(remaining, buffer.length);
-
- callCount.incrementAndGet();
- n.addAndGet(length);
- final ByteBuffer expected = ByteString.copyFrom(randomBytes(length, r)).asReadOnlyByteBuffer();
-
- client.readAsync(fileName, offset, length)
- .thenAcceptAsync(read -> verify(read, offset, length, expected), asyncExecutor)
- .thenRun(() -> {
- final int count = callCount.decrementAndGet();
- LOG.trace("verifyAsync {}, offset={}, length={}: n={}, callCount={}",
- fileName, offset, length, n.get(), count);
- if (n.get() == size && count == 0) {
- returnFuture.complete(this);
- }
- })
- .exceptionally(e -> {
- returnFuture.completeExceptionally(e);
- return null;
- });
- }
- Assert.assertEquals(size, n.get());
- return returnFuture;
- }
-
- void verify(ByteString read, int offset, int length, ByteBuffer expected) {
- Assert.assertEquals(length, read.size());
- assertBuffers(offset, length, expected, read.asReadOnlyByteBuffer());
- }
-
- CompletableFuture<Writer> deleteAsync() {
- Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
- return client.deleteAsync(fileName).thenApplyAsync(reply -> this, asyncExecutor);
- }
-
- Writer delete() throws IOException {
- client.delete(fileName);
- return this;
- }
-
- @Override
- public void close() throws IOException {
- client.close();
- }
- }
-
- static void assertBuffers(int offset, int length, ByteBuffer expected, ByteBuffer computed) {
- try {
- Assert.assertEquals(expected, computed);
- } catch(AssertionError e) {
- LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
- + "\n expected = " + StringUtils.bytes2HexString(expected)
- + "\n computed = " + StringUtils.bytes2HexString(computed), e);
- throw e;
- }
- }
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
index 2e6f40d..598dc0c 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
@@ -19,16 +19,13 @@ package org.apache.ratis.examples.filestore;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamTestUtils;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
-import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.function.CheckedSupplier;
import org.junit.Assert;
@@ -38,11 +35,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -60,7 +59,7 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
static final int NUM_PEERS = 3;
@Test
- public void testFileStoreStream() throws Exception {
+ public void testFileStoreStreamSingleFile() throws Exception {
final CLUSTER cluster = newCluster(NUM_PEERS);
cluster.start();
RaftTestUtil.waitForLeader(cluster);
@@ -72,9 +71,30 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
final CheckedSupplier<FileStoreClient, IOException> newClient =
() -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
- // TODO: configurable buffer size
- final int bufferSize = 10_000;
- testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient);
+
+ testSingleFile("foo", SizeInBytes.valueOf("2M"), 10_000, newClient);
+ testSingleFile("bar", SizeInBytes.valueOf("2M"), 1000, newClient);
+ testSingleFile("sar", SizeInBytes.valueOf("20M"), 100_000, newClient);
+
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testFileStoreStreamMultipleFiles() throws Exception {
+ final CLUSTER cluster = newCluster(NUM_PEERS);
+ cluster.start();
+ RaftTestUtil.waitForLeader(cluster);
+
+ final RaftGroup raftGroup = cluster.getGroup();
+ final Collection<RaftPeer> peers = raftGroup.getPeers();
+ Assert.assertEquals(NUM_PEERS, peers.size());
+ RaftPeer raftPeer = peers.iterator().next();
+
+ final CheckedSupplier<FileStoreClient, IOException> newClient =
+ () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+
+ testMultipleFiles("foo", 5, SizeInBytes.valueOf("2M"), 10_000, newClient);
+ testMultipleFiles("bar", 10, SizeInBytes.valueOf("2M"), 1000, newClient);
cluster.shutdown();
}
@@ -84,36 +104,37 @@ public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluste
IOException> newClient)
throws Exception {
LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
- final int size = fileLength.getSizeInt();
- try (FileStoreClient client = newClient.get()) {
- final DataStreamOutput dataStreamOutput = client.getStreamOutput(path, size);
- final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- final List<Integer> sizes = new ArrayList<>();
-
- for(int offset = 0; offset < size; ) {
- final int remaining = size - offset;
- final int length = Math.min(remaining, bufferSize);
- final boolean close = length == remaining;
-
- LOG.trace("write {}, offset={}, length={}, close? {}",
- path, offset, length, close);
- final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
- futures.add(dataStreamOutput.writeAsync(bf, close));
- sizes.add(length);
- offset += length;
- }
-
- DataStreamReply reply = dataStreamOutput.closeAsync().join();
- Assert.assertTrue(reply.isSuccess());
-
- // TODO: handle when any of the writeAsync has failed.
- // check writeAsync requests
- for (int i = 0; i < futures.size(); i++) {
- reply = futures.get(i).join();
- Assert.assertTrue(reply.isSuccess());
- Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
- Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : DataStreamPacketHeaderProto.Type.STREAM_DATA);
- }
+ FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setBufferSize(bufferSize)
+ .setFileStoreClientSupplier(newClient)
+ .build().streamWriteAndVerify();
+ }
+
+ private void testMultipleFiles(String pathBase, int numFile, SizeInBytes fileLength,
+ int bufferSize, CheckedSupplier<FileStoreClient, IOException> newClient) throws Exception {
+ final ExecutorService executor = Executors.newFixedThreadPool(numFile);
+
+ final List<Future<FileStoreWriter>> writerFutures = new ArrayList<>();
+ for (int i = 0; i < numFile; i++) {
+ String path = pathBase + "-" + i;
+ final Callable<FileStoreWriter> callable = LogUtils.newCallable(LOG,
+ () -> FileStoreWriter.newBuilder()
+ .setFileName(path)
+ .setFileSize(fileLength)
+ .setBufferSize(bufferSize)
+ .setFileStoreClientSupplier(newClient)
+ .build().streamWriteAndVerify(),
+ () -> path);
+ writerFutures.add(executor.submit(callable));
+ }
+ for (Future<FileStoreWriter> future : writerFutures) {
+ future.get();
}
}
+
+ static class StreamWriter {
+
+ }
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
new file mode 100644
index 0000000..343b0b1
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.netty.util.internal.ThreadLocalRandom;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class FileStoreWriter implements Closeable {
+ public static final Logger LOG = LoggerFactory.getLogger(FileStoreWriter.class);
+
+ final long seed = ThreadLocalRandom.current().nextLong();
+ final byte[] buffer = new byte[4 << 10];
+
+ final String fileName;
+ final SizeInBytes fileSize;
+ final FileStoreClient client;
+ final Executor asyncExecutor;
+ final int bufferSize;
+
+ static Builder newBuilder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private String fileName;
+ private SizeInBytes fileSize;
+ private CheckedSupplier<FileStoreClient, IOException> clientSupplier;
+ private Executor asyncExecutor;
+ private int bufferSize;
+
+ public Builder setFileName(String fileName) {
+ this.fileName = fileName;
+ return this;
+ }
+
+ public Builder setFileSize(SizeInBytes size) {
+ this.fileSize = size;
+ return this;
+ }
+
+ public Builder setFileStoreClientSupplier(CheckedSupplier<FileStoreClient, IOException> supplier) {
+ this.clientSupplier = supplier;
+ return this;
+ }
+
+ public Builder setAsyncExecutor(Executor asyncExecutor) {
+ this.asyncExecutor = asyncExecutor;
+ return this;
+ }
+
+ public Builder setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public FileStoreWriter build() throws IOException {
+ return new FileStoreWriter(fileName, fileSize, asyncExecutor, clientSupplier, bufferSize);
+ }
+ }
+
+ private FileStoreWriter(String fileName, SizeInBytes fileSize, Executor asyncExecutor,
+ CheckedSupplier<FileStoreClient, IOException> clientSupplier, int bufferSize)
+ throws IOException {
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.client = clientSupplier.get();
+ this.asyncExecutor = asyncExecutor;
+ this.bufferSize = bufferSize;
+ }
+
+ ByteBuffer randomBytes(int length, Random random) {
+ Preconditions.assertTrue(length <= buffer.length);
+ random.nextBytes(buffer);
+ final ByteBuffer b = ByteBuffer.wrap(buffer);
+ b.limit(length);
+ return b;
+ }
+
+ FileStoreWriter write() throws IOException {
+ final Random r = new Random(seed);
+ final int size = fileSize.getSizeInt();
+
+ for(int offset = 0; offset < size; ) {
+ final int remaining = size - offset;
+ final int length = Math.min(remaining, buffer.length);
+ final boolean close = length == remaining;
+
+ final ByteBuffer b = randomBytes(length, r);
+
+ LOG.trace("write {}, offset={}, length={}, close? {}",
+ fileName, offset, length, close);
+ final long written = client.write(fileName, offset, close, b);
+ Assert.assertEquals(length, written);
+ offset += written;
+ }
+ return this;
+ }
+
+ public FileStoreWriter streamWriteAndVerify() throws IOException {
+ final int size = fileSize.getSizeInt();
+ final DataStreamOutput dataStreamOutput = client.getStreamOutput(fileName, size);
+ final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ final List<Integer> sizes = new ArrayList<>();
+
+ for(int offset = 0; offset < size; ) {
+ final int remaining = size - offset;
+ final int length = Math.min(remaining, bufferSize);
+ final boolean close = length == remaining;
+
+ LOG.trace("write {}, offset={}, length={}, close? {}",
+ fileName, offset, length, close);
+ final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
+ futures.add(dataStreamOutput.writeAsync(bf, close));
+ sizes.add(length);
+ offset += length;
+ }
+
+ DataStreamReply reply = dataStreamOutput.closeAsync().join();
+ Assert.assertTrue(reply.isSuccess());
+
+ // TODO: handle when any of the writeAsync has failed.
+ // check writeAsync requests
+ for (int i = 0; i < futures.size(); i++) {
+ reply = futures.get(i).join();
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
+ Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA);
+ }
+
+ return this;
+ }
+
+ CompletableFuture<FileStoreWriter> writeAsync() {
+ Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+ final Random r = new Random(seed);
+ final int size = fileSize.getSizeInt();
+
+ final CompletableFuture<FileStoreWriter> returnFuture = new CompletableFuture<>();
+ final AtomicInteger callCount = new AtomicInteger();
+ final AtomicInteger n = new AtomicInteger();
+ for(; n.get() < size; ) {
+ final int offset = n.get();
+ final int remaining = size - offset;
+ final int length = Math.min(remaining, buffer.length);
+ final boolean close = length == remaining;
+
+ final ByteBuffer b = randomBytes(length, r);
+
+ callCount.incrementAndGet();
+ n.addAndGet(length);
+
+ LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
+ fileName, offset, length, close);
+ client.writeAsync(fileName, offset, close, b)
+ .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
+ .thenRun(() -> {
+ final int count = callCount.decrementAndGet();
+ LOG.trace("writeAsync {}, offset={}, length={}, close? {}: n={}, callCount={}",
+ fileName, offset, length, close, n.get(), count);
+ if (n.get() == size && count == 0) {
+ returnFuture.complete(this);
+ }
+ })
+ .exceptionally(e -> {
+ returnFuture.completeExceptionally(e);
+ return null;
+ });
+ }
+ return returnFuture;
+ }
+
+ FileStoreWriter verify() throws IOException {
+ final Random r = new Random(seed);
+ final int size = fileSize.getSizeInt();
+
+ for(int offset = 0; offset < size; ) {
+ final int remaining = size - offset;
+ final int n = Math.min(remaining, buffer.length);
+ final ByteString read = client.read(fileName, offset, n);
+ final ByteBuffer expected = randomBytes(n, r);
+ verify(read, offset, n, expected);
+ offset += n;
+ }
+ return this;
+ }
+
+ CompletableFuture<FileStoreWriter> verifyAsync() {
+ Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+ final Random r = new Random(seed);
+ final int size = fileSize.getSizeInt();
+
+ final CompletableFuture<FileStoreWriter> returnFuture = new CompletableFuture<>();
+ final AtomicInteger callCount = new AtomicInteger();
+ final AtomicInteger n = new AtomicInteger();
+ for(; n.get() < size; ) {
+ final int offset = n.get();
+ final int remaining = size - offset;
+ final int length = Math.min(remaining, buffer.length);
+
+ callCount.incrementAndGet();
+ n.addAndGet(length);
+ final ByteBuffer expected = ByteString.copyFrom(randomBytes(length, r)).asReadOnlyByteBuffer();
+
+ client.readAsync(fileName, offset, length)
+ .thenAcceptAsync(read -> verify(read, offset, length, expected), asyncExecutor)
+ .thenRun(() -> {
+ final int count = callCount.decrementAndGet();
+ LOG.trace("verifyAsync {}, offset={}, length={}: n={}, callCount={}",
+ fileName, offset, length, n.get(), count);
+ if (n.get() == size && count == 0) {
+ returnFuture.complete(this);
+ }
+ })
+ .exceptionally(e -> {
+ returnFuture.completeExceptionally(e);
+ return null;
+ });
+ }
+ Assert.assertEquals(size, n.get());
+ return returnFuture;
+ }
+
+ void verify(ByteString read, int offset, int length, ByteBuffer expected) {
+ Assert.assertEquals(length, read.size());
+ assertBuffers(offset, length, expected, read.asReadOnlyByteBuffer());
+ }
+
+ CompletableFuture<FileStoreWriter> deleteAsync() {
+ Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+ return client.deleteAsync(fileName).thenApplyAsync(reply -> this, asyncExecutor);
+ }
+
+ FileStoreWriter delete() throws IOException {
+ client.delete(fileName);
+ return this;
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+ static void assertBuffers(int offset, int length, ByteBuffer expected, ByteBuffer computed) {
+ try {
+ Assert.assertEquals(expected, computed);
+ } catch(AssertionError e) {
+ LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
+ + "\n expected = " + StringUtils.bytes2HexString(expected)
+ + "\n computed = " + StringUtils.bytes2HexString(computed), e);
+ throw e;
+ }
+ }
+}