You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/28 07:50:05 UTC

[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r531966544



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -37,4 +39,7 @@
 public interface DataStreamApi {
   /** Create a stream to write data. */
   DataStreamOutput stream();
+
+  /** Create a stream by providing a customized header message. */
+  DataStreamOutput stream(ByteBuffer headerContent);

Review comment:
       Let's call the parameter headerMessage.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;

Review comment:
       Change them to RandomAccessFile and FileChannel.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;
+    FileStoreDataChannel(Path path) {
+      try {
+        this.out = FileUtils.createNewFile(path);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.channel = Channels.newChannel(out);
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return channel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();

Review comment:
       Close the RandomAccessFile.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +154,46 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(String path) {

Review comment:
       Just pass DataChannel.

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -184,6 +186,14 @@ public DataStreamOutputRpc stream(RaftClientRequest request) {
     return new DataStreamOutputImpl(request);
   }
 
+  @Override
+  public DataStreamOutputRpc stream(ByteBuffer headerContent) {
+    ByteString payload = ByteString.copyFrom(headerContent);

Review comment:
       Use Optional to support null,
   ```
       final Message m = Optional.ofNullable(headerContent).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;
+    FileStoreDataChannel(Path path) {
+      try {
+        this.out = FileUtils.createNewFile(path);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.channel = Channels.newChannel(out);
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+

Review comment:
       Call FileChannel.force(..)

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }

Review comment:
       Make it async
   ```
     CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
       return CompletableFuture.supplyAsync(() -> {
         final Path root = getRoot();
         final Path relative = Paths.get(p).normalize();
         final Path full = root.resolve(relative).normalize().toAbsolutePath();
         try {
           return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "w"));
         } catch (IOException e) {
           throw new CompletionException("Failed to create " + p, e);
         }
       }, writer);
     }
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;
+    FileStoreDataChannel(Path path) {
+      try {
+        this.out = FileUtils.createNewFile(path);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.channel = Channels.newChannel(out);
+    }

Review comment:
       Pass RandomAccessFile
   ```
       FileStoreDataChannel(RandomAccessFile raf) {
         this.raf = raf;
         this.channel = raf.getChannel();
       }
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+

Review comment:
       Review you own pull request and revert changes like this.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +154,46 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(String path) {
+      dataChannel = files.createDataChannel(path);
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return CompletableFuture.supplyAsync(() -> {
+      return new LocalStream(proto.getPath().toStringUtf8());
+    });

Review comment:
       Change it to
   ```
       return files.createDataChannel(proto.getPath().toStringUtf8())
           .thenApply(LocalStream::new);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org