You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/08 08:27:30 UTC
[incubator-ratis] branch master updated: RATIS-1217. Fix filestore
failed to readStateMachineData when use AsyncApi (#335)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new be9a0c1 RATIS-1217. Fix filestore failed to readStateMachineData when use AsyncApi (#335)
be9a0c1 is described below
commit be9a0c1d8987f849ca00ac9a4107d9402b9ab7cb
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 8 16:27:24 2020 +0800
RATIS-1217. Fix filestore failed to readStateMachineData when use AsyncApi (#335)
---
.../apache/ratis/examples/filestore/FileInfo.java | 41 ++++++++++++++++------
.../apache/ratis/examples/filestore/FileStore.java | 4 +--
.../ratis/examples/filestore/FileStoreClient.java | 2 +-
.../examples/filestore/FileStoreStateMachine.java | 4 +--
.../ratis/examples/filestore/cli/DataStream.java | 4 +--
.../ratis/examples/filestore/cli/Server.java | 3 ++
6 files changed, 41 insertions(+), 17 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 5892036..886a646 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -62,7 +62,12 @@ abstract class FileInfo {
return relativePath;
}
- long getSize() {
+ long getWriteSize() {
+ throw new UnsupportedOperationException(
+ "File " + getRelativePath() + " size is unknown.");
+ }
+
+ long getCommittedSize() {
throw new UnsupportedOperationException(
"File " + getRelativePath() + " size is unknown.");
}
@@ -71,12 +76,16 @@ abstract class FileInfo {
// no-op
}
- ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length)
+ ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
throws IOException {
flush();
- if (offset + length > getSize()) {
- throw new IOException("Failed to read: offset (=" + offset
- + " + length (=" + length + ") > size = " + getSize()
+ if (readCommitted && offset + length > getCommittedSize()) {
+ throw new IOException("Failed to read Committed: offset (=" + offset
+ + " + length (=" + length + ") > size = " + getCommittedSize()
+ + ", path=" + getRelativePath());
+ } else if (offset + length > getWriteSize()){
+ throw new IOException("Failed to read Wrote: offset (=" + offset
+ + " + length (=" + length + ") > size = " + getWriteSize()
+ ", path=" + getRelativePath());
}
@@ -95,16 +104,23 @@ abstract class FileInfo {
}
static class ReadOnly extends FileInfo {
- private final long size;
+ private final long committedSize;
+ private final long writeSize;
ReadOnly(UnderConstruction f) {
super(f.getRelativePath());
- this.size = f.getSize();
+ this.committedSize = f.getCommittedSize();
+ this.writeSize = f.getWriteSize();
+ }
+
+ @Override
+ long getCommittedSize() {
+ return committedSize;
}
@Override
- long getSize() {
- return size;
+ long getWriteSize() {
+ return writeSize;
}
}
@@ -185,10 +201,15 @@ abstract class FileInfo {
}
@Override
- long getSize() {
+ long getCommittedSize() {
return committedSize;
}
+ @Override
+ long getWriteSize() {
+ return writeSize;
+ }
+
CompletableFuture<Integer> submitCreate(
CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close,
ExecutorService executor, RaftPeerId id, long index) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index fe5ec34..84fab1a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -137,7 +137,7 @@ public class FileStore implements Closeable {
return full;
}
- CompletableFuture<ReadReplyProto> read(String relative, long offset, long length) {
+ CompletableFuture<ReadReplyProto> read(String relative, long offset, long length, boolean readCommitted) {
final Supplier<String> name = () -> "read(" + relative
+ ", " + offset + ", " + length + ") @" + getId();
final CheckedSupplier<ReadReplyProto, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
@@ -146,7 +146,7 @@ public class FileStore implements Closeable {
.setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath()))
.setOffset(offset);
- final ByteString bytes = info.read(this::resolve, offset, length);
+ final ByteString bytes = info.read(this::resolve, offset, length, readCommitted);
return reply.setData(bytes).build();
}, name);
return submit(task, reader);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 786f7bd..51dc100 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -175,7 +175,7 @@ public class FileStoreClient implements Closeable {
final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setOffset(offset)
- .setLength(data.position())
+ .setLength(data.remaining())
.setClose(close);
final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index b5940ad..aa99240 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -87,7 +87,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
}
final String path = proto.getPath().toStringUtf8();
- return files.read(path, proto.getOffset(), proto.getLength())
+ return files.read(path, proto.getOffset(), proto.getLength(), true)
.thenApply(reply -> Message.valueOf(reply.toByteString()));
}
@@ -149,7 +149,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
final WriteRequestHeaderProto h = proto.getWriteHeader();
CompletableFuture<ExamplesProtos.ReadReplyProto> reply =
- files.read(h.getPath().toStringUtf8(), h.getOffset(), h.getLength());
+ files.read(h.getPath().toStringUtf8(), h.getOffset(), h.getLength(), false);
return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index b31229b..dd653be 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -132,7 +132,7 @@ public class DataStream extends Client {
} else if (bytesRead > 0) {
offset += bytesRead;
- final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(), offset == fileSize);
+ final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer());
f.thenRun(buf::release);
futures.add(f);
}
@@ -145,7 +145,7 @@ public class DataStream extends Client {
FileChannel fileChannel) throws IOException {
List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
- futures.add(dataStreamOutput.writeAsync(mappedByteBuffer, true));
+ futures.add(dataStreamOutput.writeAsync(mappedByteBuffer));
return futures;
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index d135c59..192d721 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -40,6 +40,7 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.io.File;
@@ -79,6 +80,8 @@ public class Server extends SubCommandBase {
RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
+ RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
+ RaftServerConfigKeys.Write.setByteLimit(properties, SizeInBytes.valueOf("1000MB"));
ConfUtils.setFile(properties::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
storageDir);
StateMachine stateMachine = new FileStoreStateMachine(properties);