You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/02/12 02:07:34 UTC
incubator-ratis git commit: RATIS-170. Add async support to the
FileStore example.
Repository: incubator-ratis
Updated Branches:
refs/heads/master bb5c258c3 -> 3e69b13a2
RATIS-170. Add async support to the FileStore example.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3e69b13a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3e69b13a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3e69b13a
Branch: refs/heads/master
Commit: 3e69b13a2bfd55c56e0c5f2d310601dcfc46731e
Parents: bb5c258
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Feb 12 10:07:03 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Feb 12 10:07:03 2018 +0800
----------------------------------------------------------------------
.../java/org/apache/ratis/util/JavaUtils.java | 11 ++
.../java/org/apache/ratis/util/LogUtils.java | 2 +-
.../ratis/examples/filestore/FileInfo.java | 80 +++++++++---
.../examples/filestore/FileStoreClient.java | 80 +++++++++---
.../filestore/FileStoreAsyncBaseTest.java | 110 ++++++++++++++++
.../examples/filestore/FileStoreBaseTest.java | 126 +++++++++++++++----
.../filestore/TestFileStoreAsyncWithGrpc.java | 25 ++++
7 files changed, 379 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 8322209..926bab8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -200,4 +200,15 @@ public interface JavaUtils {
static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]));
}
+
+ static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndWrapAsCompletionException(
+ CheckedSupplier<OUTPUT, THROWABLE> supplier) {
+ try {
+ return supplier.get();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index e75b89b..376fed5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -80,7 +80,7 @@ public interface LogUtils {
}
if (log.isTraceEnabled()) {
- log.trace("Successfully supplied " + name.get());
+ log.trace("Successfully supplied " + name.get() + ": " + output);
}
return output;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index d8b3eb4..636234f 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -35,8 +35,10 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -123,6 +125,33 @@ abstract class FileInfo {
}
}
+ static class WriteInfo {
+ /** Future to make sure that each commit is executed after the corresponding write. */
+ private final CompletableFuture<Integer> writeFuture;
+ /** Future to make sure that each commit is executed after the previous commit. */
+ private final CompletableFuture<Integer> commitFuture;
+ /** Previous commit index. */
+ private final long previousIndex;
+
+ WriteInfo(CompletableFuture<Integer> writeFuture, long previousIndex) {
+ this.writeFuture = writeFuture;
+ this.commitFuture = new CompletableFuture<>();
+ this.previousIndex = previousIndex;
+ }
+
+ CompletableFuture<Integer> getCommitFuture() {
+ return commitFuture;
+ }
+
+ CompletableFuture<Integer> getWriteFuture() {
+ return writeFuture;
+ }
+
+ long getPreviousIndex() {
+ return previousIndex;
+ }
+ }
+
static class UnderConstruction extends FileInfo {
private FileOut out;
@@ -135,10 +164,9 @@ abstract class FileInfo {
/** A queue to make sure that the writes are in order. */
private final TaskQueue writeQueue = new TaskQueue("writeQueue");
- /** A queue to make sure that the commits are in order. */
- private final TaskQueue commitQueue = new TaskQueue("commitQueue");
- /** Futures to make sure that each commit is executed the corresponding write. */
- private final Map<Long, CompletableFuture<Integer>> writeFutures = new ConcurrentHashMap<>();
+ private final Map<Long, WriteInfo> writeInfos = new ConcurrentHashMap<>();
+
+ private final AtomicLong lastWriteIndex = new AtomicLong(-1L);
UnderConstruction(Path relativePath) {
super(relativePath);
@@ -179,11 +207,12 @@ abstract class FileInfo {
}
private CompletableFuture<Integer> submitWrite(
- CheckedSupplier<Integer, IOException> task, ExecutorService executor,
- RaftPeerId id, long index) {
+ CheckedSupplier<Integer, IOException> task,
+ ExecutorService executor, RaftPeerId id, long index) {
final CompletableFuture<Integer> f = writeQueue.submit(task, executor,
e -> new IOException("Failed " + task, e));
- CollectionUtils.putNew(index, f, writeFutures, () -> id + ":writeFutures");
+ final WriteInfo info = new WriteInfo(f, lastWriteIndex.getAndSet(index));
+ CollectionUtils.putNew(index, info, writeInfos, () -> id + ":writeInfos");
return f;
}
@@ -230,10 +259,18 @@ abstract class FileInfo {
}
CompletableFuture<Integer> submitCommit(
- long offset, int size, Function<UnderConstruction, ReadOnly> converter,
+ long offset, int size, Function<UnderConstruction, ReadOnly> closeFunction,
ExecutorService executor, RaftPeerId id, long index) {
+ final boolean close = closeFunction != null;
final Supplier<String> name = () -> "commit(" + getRelativePath() + ", "
- + offset + ", " + size + ") @" + id + ":" + index;
+ + offset + ", " + size + ", close? " + close + ") @" + id + ":" + index;
+
+ final WriteInfo info = writeInfos.get(index);
+ if (info == null) {
+ return JavaUtils.completeExceptionally(
+ new IOException(name.get() + " is already committed."));
+ }
+
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
if (offset != committedSize) {
throw new IOException("Offset/size mismatched: offset = "
@@ -245,21 +282,26 @@ abstract class FileInfo {
}
committedSize += size;
- if (converter != null) {
- converter.apply(this);
+ if (close) {
+ closeFunction.apply(this);
+ writeInfos.remove(index);
}
+ info.getCommitFuture().complete(size);
return size;
}, name);
- final CompletableFuture<Integer> write = writeFutures.remove(index);
- if (write == null) {
- return JavaUtils.completeExceptionally(
- new IOException(name.get() + " is already committed."));
- }
- return write.thenComposeAsync(writeSize -> {
+ // Remove previous info, if there is any.
+ final WriteInfo previous = writeInfos.remove(info.getPreviousIndex());
+ final CompletableFuture<Integer> previousCommit = previous != null?
+ previous.getCommitFuture(): CompletableFuture.completedFuture(0);
+ // Commit after both current write and previous commit completed.
+ return info.getWriteFuture().thenCombineAsync(previousCommit, (writeSize, previousCommitSize) -> {
Preconditions.assertTrue(size == writeSize);
- return commitQueue.submit(task, executor,
- e -> new IOException("Failed " + task, e));
+ try {
+ return task.get();
+ } catch (IOException e) {
+ throw new CompletionException("Failed " + task, e);
+ }
}, executor);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index de794c2..a8f141c 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.ExamplesProtos.*;
import org.apache.ratis.util.CheckedFunction;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
@@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
/** A standalone server using raft with a configurable state machine. */
public class FileStoreClient implements Closeable {
@@ -62,10 +66,24 @@ public class FileStoreClient implements Closeable {
if (sme != null) {
throw new IOException("Failed to send request " + request, sme);
}
- Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply);
+ Preconditions.assertTrue(reply.isSuccess(), () -> "Failed " + request + ", reply=" + reply);
return reply.getMessage().getContent();
}
+ static CompletableFuture<ByteString> sendAsync(
+ ByteString request, Function<Message, CompletableFuture<RaftClientReply>> sendFunction)
+ {
+ return sendFunction.apply(() -> request
+ ).thenApply(reply -> {
+ final StateMachineException sme = reply.getStateMachineException();
+ if (sme != null) {
+ throw new CompletionException("Failed to send request " + request, sme);
+ }
+ Preconditions.assertTrue(reply.isSuccess(), () -> "Failed " + request + ", reply=" + reply);
+ return reply.getMessage().getContent();
+ });
+ }
+
private ByteString send(ByteString request) throws IOException {
return send(request, client::send);
}
@@ -74,30 +92,55 @@ public class FileStoreClient implements Closeable {
return send(request, client::sendReadOnly);
}
+ private CompletableFuture<ByteString> sendAsync(ByteString request) {
+ return sendAsync(request, client::sendAsync);
+ }
+
+ private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) {
+ return sendAsync(request, client::sendReadOnlyAsync);
+ }
+
public ByteString read(String path, long offset, long length) throws IOException {
- return readImpl(path, offset, length).getData();
+ final ByteString reply = readImpl(this::sendReadOnly, path, offset, length);
+ return ReadReplyProto.parseFrom(reply).getData();
}
- private ReadReplyProto readImpl(String path, long offset, long length) throws IOException {
+ public CompletableFuture<ByteString> readAsync(String path, long offset, long length) {
+ return readImpl(this::sendReadOnlyAsync, path, offset, length
+ ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+ () -> ReadReplyProto.parseFrom(reply).getData()));
+ }
+
+ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT readImpl(
+ CheckedFunction<ByteString, OUTPUT, THROWABLE> sendReadOnlyFunction,
+ String path, long offset, long length) throws THROWABLE {
final ReadRequestProto read = ReadRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setOffset(offset)
.setLength(length)
.build();
- return ReadReplyProto.parseFrom(sendReadOnly(read.toByteString()));
+ return sendReadOnlyFunction.apply(read.toByteString());
}
public long write(String path, long offset, boolean close, ByteBuffer buffer)
throws IOException {
final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
buffer.limit(chunkSize);
- final WriteReplyProto proto = writeImpl(path, offset, close, ByteString.copyFrom(buffer));
- return proto.getLength();
+ final ByteString reply = writeImpl(this::send, path, offset, close, buffer);
+ return WriteReplyProto.parseFrom(reply).getLength();
}
- private WriteReplyProto writeImpl(String path, long offset, boolean close, ByteString data)
- throws IOException {
+ public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer) {
+ return writeImpl(this::sendAsync, path, offset, close, buffer
+ ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+ () -> WriteReplyProto.parseFrom(reply).getLength()));
+ }
+
+ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
+ CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
+ String path, long offset, boolean close, ByteBuffer data)
+ throws THROWABLE {
final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setOffset(offset)
@@ -105,20 +148,29 @@ public class FileStoreClient implements Closeable {
final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
.setHeader(header)
- .setData(data);
+ .setData(ByteString.copyFrom(data));
final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build();
- return WriteReplyProto.parseFrom(send(request.toByteString()));
+ return sendFunction.apply(request.toByteString());
}
- private DeleteReplyProto deleteImpl(String path) throws IOException {
+ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT deleteImpl(
+ CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction, String path)
+ throws THROWABLE {
final DeleteRequestProto.Builder delete = DeleteRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path));
final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setDelete(delete).build();
- return DeleteReplyProto.parseFrom(send(request.toByteString()));
+ return sendFunction.apply(request.toByteString());
+ }
+
+ public String delete(String path) throws IOException {
+ final ByteString reply = deleteImpl(this::send, path);
+ return DeleteReplyProto.parseFrom(reply).getResolvedPath().toStringUtf8();
}
- public void delete(String path) throws IOException {
- deleteImpl(path);
+ public CompletableFuture<String> deleteAsync(String path) {
+ return deleteImpl(this::sendAsync, path
+ ).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
+ () -> DeleteReplyProto.parseFrom(reply).getResolvedPath().toStringUtf8()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
new file mode 100644
index 0000000..7cf8e3a
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.filestore.FileStoreBaseTest.Writer;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ public static final Logger LOG = LoggerFactory.getLogger(FileStoreAsyncBaseTest.class);
+
+ {
+ final RaftProperties p = getProperties();
+ p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ FileStoreStateMachine.class, StateMachine.class);
+ ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+ new File(getClassTestDir(), "filestore"));
+ }
+
+ static final int NUM_PEERS = 3;
+
+ @Test
+ public void testFileStoreAsync() throws Exception {
+ final CLUSTER cluster = newCluster(NUM_PEERS);
+ cluster.start();
+ RaftTestUtil.waitForLeader(cluster);
+
+ final FileStoreClient client = new FileStoreClient(cluster.getGroup(), getProperties());
+ final ExecutorService executor = Executors.newFixedThreadPool(20);
+
+ testSingleFile("foo", SizeInBytes.valueOf("10M"), executor, client);
+ testMultipleFiles("file", 100, SizeInBytes.valueOf("1M"), executor, client);
+
+ executor.shutdown();
+ client.close();
+ cluster.shutdown();
+ }
+
+ private static void testSingleFile(
+ String path, SizeInBytes fileLength, Executor executor, FileStoreClient client)
+ throws Exception {
+ LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+
+ new Writer(path, fileLength, executor, () -> client)
+ .writeAsync()
+ .thenCompose(Writer::verifyAsync)
+ .thenCompose(Writer::deleteAsync)
+ .get();
+ }
+
+ private static void testMultipleFiles(
+ String pathPrefix, int numFile, SizeInBytes fileLength, Executor executor,
+ FileStoreClient client) throws Exception {
+ LOG.info("runTestMultipleFile with pathPrefix={}, numFile={}, fileLength={}",
+ pathPrefix, numFile, fileLength);
+
+ final List<CompletableFuture<Writer>> writerFutures = new ArrayList<>();
+ for (int i = 0; i < numFile; i++) {
+ final String path = String.format("%s%02d", pathPrefix, i);
+ final Callable<CompletableFuture<Writer>> callable = LogUtils.newCallable(LOG,
+ () -> new Writer(path, fileLength, executor, () -> client).writeAsync(),
+ () -> path + ":" + fileLength);
+ writerFutures.add(callable.call());
+ }
+
+ final List<Writer> writers = new ArrayList<>();
+ for(CompletableFuture<Writer> f : writerFutures) {
+ writers.add(f.get());
+ }
+
+ writerFutures.clear();
+ for (Writer w : writers) {
+ writerFutures.add(w.verifyAsync().thenCompose(Writer::deleteAsync));
+ }
+ for(CompletableFuture<Writer> f : writerFutures) {
+ f.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index 66aeb6a..7e9c57e 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -37,11 +37,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -78,7 +77,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
throws Exception {
LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
- try (final Writer w = new Writer(path, fileLength, newClient)) {
+ try (final Writer w = new Writer(path, fileLength, null, newClient)) {
w.write().verify().delete();
}
}
@@ -95,7 +94,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
for (int i = 0; i < numFile; i++) {
final String path = String.format("%s%02d", pathPrefix, i);
final Callable<Writer> callable = LogUtils.newCallable(LOG,
- () -> new Writer(path, fileLength, newClient).write(),
+ () -> new Writer(path, fileLength, null, newClient).write(),
() -> path + ":" + fileLength);
writerFutures.add(executor.submit(callable));
}
@@ -123,12 +122,15 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
final String fileName;
final SizeInBytes fileSize;
final FileStoreClient client;
+ final Executor asyncExecutor;
- Writer(String fileName, SizeInBytes fileSize, CheckedSupplier<FileStoreClient, IOException> newClient)
+ Writer(String fileName, SizeInBytes fileSize, Executor asyncExecutor,
+ CheckedSupplier<FileStoreClient, IOException> clientSupplier)
throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
- this.client = newClient.get();
+ this.client = clientSupplier.get();
+ this.asyncExecutor = asyncExecutor;
}
ByteBuffer randomBytes(int length, Random random) {
@@ -145,19 +147,59 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
for(int offset = 0; offset < size; ) {
final int remaining = size - offset;
- final int n = Math.min(remaining, buffer.length);
- final boolean close = n == remaining;
+ final int length = Math.min(remaining, buffer.length);
+ final boolean close = length == remaining;
- final ByteBuffer b = randomBytes(n, r);
+ final ByteBuffer b = randomBytes(length, r);
- LOG.trace("client write {}, offset={}", fileName, offset);
+ LOG.trace("write {}, offset={}, length={}, close? {}",
+ fileName, offset, length, close);
final long written = client.write(fileName, offset, close, b);
- Assert.assertEquals(n, written);
+ Assert.assertEquals(length, written);
offset += written;
}
return this;
}
+ CompletableFuture<Writer> writeAsync() {
+ Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+ final Random r = new Random(seed);
+ final int size = fileSize.getSizeInt();
+
+ final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
+ final AtomicInteger callCount = new AtomicInteger();
+ final AtomicInteger n = new AtomicInteger();
+ for(; n.get() < size; ) {
+ final int offset = n.get();
+ final int remaining = size - offset;
+ final int length = Math.min(remaining, buffer.length);
+ final boolean close = length == remaining;
+
+ final ByteBuffer b = randomBytes(length, r);
+
+ callCount.incrementAndGet();
+ n.addAndGet(length);
+
+ LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
+ fileName, offset, length, close);
+ client.writeAsync(fileName, offset, close, b)
+ .thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
+ .thenRun(() -> {
+ final int count = callCount.decrementAndGet();
+ LOG.trace("writeAsync {}, offset={}, length={}, close? {}: n={}, callCount={}",
+ fileName, offset, length, close, n.get(), count);
+ if (n.get() == size && count == 0) {
+ returnFuture.complete(this);
+ }
+ })
+ .exceptionally(e -> {
+ returnFuture.completeExceptionally(e);
+ return null;
+ });
+ }
+ return returnFuture;
+ }
+
Writer verify() throws IOException {
final Random r = new Random(seed);
final int size = fileSize.getSizeInt();
@@ -165,18 +207,60 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
for(int offset = 0; offset < size; ) {
final int remaining = size - offset;
final int n = Math.min(remaining, buffer.length);
-
final ByteString read = client.read(fileName, offset, n);
- Assert.assertEquals(n, read.size());
-
- final ByteBuffer b = randomBytes(n, r);
-
- assertBuffers(offset, n, b, read.asReadOnlyByteBuffer());
+ final ByteBuffer expected = randomBytes(n, r);
+ verify(read, offset, n, expected);
offset += n;
}
return this;
}
+ CompletableFuture<Writer> verifyAsync() {
+ Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+ final Random r = new Random(seed);
+ final int size = fileSize.getSizeInt();
+
+ final CompletableFuture<Writer> returnFuture = new CompletableFuture<>();
+ final AtomicInteger callCount = new AtomicInteger();
+ final AtomicInteger n = new AtomicInteger();
+ for(; n.get() < size; ) {
+ final int offset = n.get();
+ final int remaining = size - offset;
+ final int length = Math.min(remaining, buffer.length);
+
+ callCount.incrementAndGet();
+ n.addAndGet(length);
+ final ByteBuffer expected = ByteString.copyFrom(randomBytes(length, r)).asReadOnlyByteBuffer();
+
+ client.readAsync(fileName, offset, length)
+ .thenAcceptAsync(read -> verify(read, offset, length, expected), asyncExecutor)
+ .thenRun(() -> {
+ final int count = callCount.decrementAndGet();
+ LOG.trace("verifyAsync {}, offset={}, length={}: n={}, callCount={}",
+ fileName, offset, length, n.get(), count);
+ if (n.get() == size && count == 0) {
+ returnFuture.complete(this);
+ }
+ })
+ .exceptionally(e -> {
+ returnFuture.completeExceptionally(e);
+ return null;
+ });
+ }
+ Assert.assertEquals(size, n.get());
+ return returnFuture;
+ }
+
+ void verify(ByteString read, int offset, int length, ByteBuffer expected) {
+ Assert.assertEquals(length, read.size());
+ assertBuffers(offset, length, expected, read.asReadOnlyByteBuffer());
+ }
+
+ CompletableFuture<Writer> deleteAsync() {
+ Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
+ return client.deleteAsync(fileName).thenApplyAsync(reply -> this, asyncExecutor);
+ }
+
Writer delete() throws IOException {
client.delete(fileName);
return this;
@@ -193,8 +277,8 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
Assert.assertEquals(expected, computed);
} catch(AssertionError e) {
LOG.error("Buffer mismatched at offset=" + offset + ", length=" + length
- + "expected = " + StringUtils.bytes2HexString(expected) + "\n"
- + "computed = " + StringUtils.bytes2HexString(computed) + "\n", e);
+ + "\n expected = " + StringUtils.bytes2HexString(expected)
+ + "\n computed = " + StringUtils.bytes2HexString(computed), e);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e69b13a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
new file mode 100644
index 0000000..02bd2b0
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+
+public class TestFileStoreAsyncWithGrpc
+ extends FileStoreAsyncBaseTest<MiniRaftClusterWithGRpc>
+ implements MiniRaftClusterWithGRpc.FactoryGet {
+}