You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/08 08:27:30 UTC

[incubator-ratis] branch master updated: RATIS-1217. Fix filestore failed to readStateMachineData when use AsyncApi (#335)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new be9a0c1  RATIS-1217. Fix filestore failed to readStateMachineData when use AsyncApi (#335)
be9a0c1 is described below

commit be9a0c1d8987f849ca00ac9a4107d9402b9ab7cb
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 8 16:27:24 2020 +0800

    RATIS-1217. Fix filestore failed to readStateMachineData when use AsyncApi (#335)
---
 .../apache/ratis/examples/filestore/FileInfo.java  | 41 ++++++++++++++++------
 .../apache/ratis/examples/filestore/FileStore.java |  4 +--
 .../ratis/examples/filestore/FileStoreClient.java  |  2 +-
 .../examples/filestore/FileStoreStateMachine.java  |  4 +--
 .../ratis/examples/filestore/cli/DataStream.java   |  4 +--
 .../ratis/examples/filestore/cli/Server.java       |  3 ++
 6 files changed, 41 insertions(+), 17 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 5892036..886a646 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -62,7 +62,12 @@ abstract class FileInfo {
     return relativePath;
   }
 
-  long getSize() {
+  long getWriteSize() {
+    throw new UnsupportedOperationException(
+        "File " + getRelativePath() + " size is unknown.");
+  }
+
+  long getCommittedSize() {
     throw new UnsupportedOperationException(
         "File " + getRelativePath() + " size is unknown.");
   }
@@ -71,12 +76,16 @@ abstract class FileInfo {
     // no-op
   }
 
-  ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length)
+  ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
       throws IOException {
     flush();
-    if (offset + length > getSize()) {
-      throw new IOException("Failed to read: offset (=" + offset
-          + " + length (=" + length + ") > size = " + getSize()
+    if (readCommitted && offset + length > getCommittedSize()) {
+      throw new IOException("Failed to read Committed: offset (=" + offset
+          + " + length (=" + length + ") > size = " + getCommittedSize()
+          + ", path=" + getRelativePath());
+    } else if (offset + length > getWriteSize()){
+      throw new IOException("Failed to read Wrote: offset (=" + offset
+          + " + length (=" + length + ") > size = " + getWriteSize()
           + ", path=" + getRelativePath());
     }
 
@@ -95,16 +104,23 @@ abstract class FileInfo {
   }
 
   static class ReadOnly extends FileInfo {
-    private final long size;
+    private final long committedSize;
+    private final long writeSize;
 
     ReadOnly(UnderConstruction f) {
       super(f.getRelativePath());
-      this.size = f.getSize();
+      this.committedSize = f.getCommittedSize();
+      this.writeSize = f.getWriteSize();
+    }
+
+    @Override
+    long getCommittedSize() {
+      return committedSize;
     }
 
     @Override
-    long getSize() {
-      return size;
+    long getWriteSize() {
+      return writeSize;
     }
   }
 
@@ -185,10 +201,15 @@ abstract class FileInfo {
     }
 
     @Override
-    long getSize() {
+    long getCommittedSize() {
       return committedSize;
     }
 
+    @Override
+    long getWriteSize() {
+      return writeSize;
+    }
+
     CompletableFuture<Integer> submitCreate(
         CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close,
         ExecutorService executor, RaftPeerId id, long index) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index fe5ec34..84fab1a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -137,7 +137,7 @@ public class FileStore implements Closeable {
     return full;
   }
 
-  CompletableFuture<ReadReplyProto> read(String relative, long offset, long length) {
+  CompletableFuture<ReadReplyProto> read(String relative, long offset, long length, boolean readCommitted) {
     final Supplier<String> name = () -> "read(" + relative
         + ", " + offset + ", " + length + ") @" + getId();
     final CheckedSupplier<ReadReplyProto, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
@@ -146,7 +146,7 @@ public class FileStore implements Closeable {
           .setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath()))
           .setOffset(offset);
 
-      final ByteString bytes = info.read(this::resolve, offset, length);
+      final ByteString bytes = info.read(this::resolve, offset, length, readCommitted);
       return reply.setData(bytes).build();
     }, name);
     return submit(task, reader);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 786f7bd..51dc100 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -175,7 +175,7 @@ public class FileStoreClient implements Closeable {
     final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setOffset(offset)
-        .setLength(data.position())
+        .setLength(data.remaining())
         .setClose(close);
 
     final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index b5940ad..aa99240 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -87,7 +87,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
     }
 
     final String path = proto.getPath().toStringUtf8();
-    return files.read(path, proto.getOffset(), proto.getLength())
+    return files.read(path, proto.getOffset(), proto.getLength(), true)
         .thenApply(reply -> Message.valueOf(reply.toByteString()));
   }
 
@@ -149,7 +149,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
 
     final WriteRequestHeaderProto h = proto.getWriteHeader();
     CompletableFuture<ExamplesProtos.ReadReplyProto> reply =
-        files.read(h.getPath().toStringUtf8(), h.getOffset(), h.getLength());
+        files.read(h.getPath().toStringUtf8(), h.getOffset(), h.getLength(), false);
 
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index b31229b..dd653be 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -132,7 +132,7 @@ public class DataStream extends Client {
       } else if (bytesRead > 0) {
         offset += bytesRead;
 
-        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(), offset == fileSize);
+        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer());
         f.thenRun(buf::release);
         futures.add(f);
       }
@@ -145,7 +145,7 @@ public class DataStream extends Client {
       FileChannel fileChannel) throws IOException {
     List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
     MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
-    futures.add(dataStreamOutput.writeAsync(mappedByteBuffer, true));
+    futures.add(dataStreamOutput.writeAsync(mappedByteBuffer));
     return futures;
   }
 
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index d135c59..192d721 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -40,6 +40,7 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.File;
@@ -79,6 +80,8 @@ public class Server extends SubCommandBase {
     RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
     properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
+    RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
+    RaftServerConfigKeys.Write.setByteLimit(properties, SizeInBytes.valueOf("1000MB"));
     ConfUtils.setFile(properties::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
         storageDir);
     StateMachine stateMachine = new FileStoreStateMachine(properties);