You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/24 23:31:40 UTC

[GitHub] [incubator-ratis] amaliujia opened a new pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

amaliujia opened a new pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298


   https://issues.apache.org/jira/browse/RATIS-1111
   
   ## How was this patch tested?
   
   UT
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r530042599



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
##########
@@ -141,6 +155,14 @@ public long write(String path, long offset, boolean close, ByteBuffer buffer)
     return WriteReplyProto.parseFrom(reply).getLength();
   }
 
+  public DataStreamOutput getStreamOutput(String path) {
+    final StreamWriteRequestHeaderProto header = StreamWriteRequestHeaderProto.newBuilder()
+        .setPath(ProtoUtils.toByteString(path))
+        .build();
+    RaftClientRequest request = client.createDataStreamHeaderRequest(Message.valueOf(header.toByteString()));
+    return client.getDataStreamApi().stream(request);

Review comment:
       FileStoreClient should not create RaftClientRequest by itself. It will be automatically created.  Just send the path.
   ```
       return client.getDataStreamApi().stream(ProtoUtils.toByteString(path).asReadOnlyByteBuffer());
   ```

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -37,4 +39,6 @@
 public interface DataStreamApi {
   /** Create a stream to write data. */
   DataStreamOutput stream();
+
+  DataStreamOutput stream(RaftClientRequest request);

Review comment:
       Same as RaftClient, RaftClientRequest is for internal use only in Ratis. Please don't add this. (Similar to AsyncApi, the RaftClientRequest is for internal use only.)

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
##########
@@ -68,6 +68,8 @@
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
 
+  RaftClientRequest createDataStreamHeaderRequest(Message message);
+

Review comment:
       DataStream header is for internal use only in Ratis.  Please don't add this.  (Similar to AsyncApi, the RaftClientRequest is for internal use only.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533437943



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +163,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(stream, entry);

Review comment:
       In applyTransaction(..), a log entry will be given and then the state machine has to apply it.
   
   For Streaming in FileStore, we may simply check if the stream is closed and the file exists since the data should have already been written to the file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-735040004


   @szetszwo sorry for a bit delay on this PR.
   
   Sending a `StreamRequestHeaderProto` which includes path and data length make senses to me. This is also what I am trying to achieve in this PR.
   
   The question so far is, how to allow user send StreamRequestHeaderProto (or any other proto) to state machine?
   
   I think there are two models:
   1. Add it as a part of the stream header request. This will require to update DataStreamApi to expose a method (e.g. `DataStreamOutput stream(ByteBuffer headerContent)`). In this model, the header request will include information to initialize a data stream in state machine. 
   2. Keep stream header as what it is now, and for `StreamRequestHeaderProto`, user can use `DataStreamOutput.writeAsync` to send it (so `DataStreamOutput.writeAsync` is one of the stream DATA packet). In this case, state machine will need to understand that not all DATA is data, one could be a special proto.
   
   To me the model 1 is easy to use. What do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r530050628



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+  static final int MODULUS = 23;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient, leader);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient, RaftServerImpl leader)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+    final int size = fileLength.getSizeInt();
+    FileStoreClient client = newClient.get();
+    DataStreamOutput dataStreamOutput = client.getStreamOutput(path);
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    final List<Integer> sizes = new ArrayList<>();
+
+    for(int offset = 0; offset < size; ) {
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, bufferSize);
+      final boolean close = length == remaining;
+
+      LOG.trace("write {}, offset={}, length={}, close? {}",
+          path, offset, length, close);
+      final ByteBuffer bf = initBuffer(0, length);
+      futures.add(dataStreamOutput.writeAsync(bf, close));
+      sizes.add(length);
+      offset += length;
+    }
+
+    // TODO: handle when any of the writeAsync has failed.
+    // check writeAsync requests
+    for (int i = 0; i < futures.size(); i++) {
+      final DataStreamReply reply = futures.get(i).join();
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
+      Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : DataStreamPacketHeaderProto.Type.STREAM_DATA);
+    }
+
+    dataStreamOutput.closeAsync().join();
+  }
+
+  static ByteBuffer initBuffer(int offset, int size) {

Review comment:
       The unit test could use generated data in memory.  For the CLI, it should use real files on local disk for running benchmarks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-733293047


   cc @szetszwo @runzhiwang  please let me know if you have any early feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-736082327


   @szetszwo sorry for a bit delay on this PR.
   
   1. Until this moment I am still not sure what `DataApi.link(..)` is supposed to do for `FileStoreStateMachine`. Any hint for it? Given a DataStream and a LogEntryProto, what's the required operation to link these two objects?
   
   2. I updated the unit test and found that transaction cannot be applied successfully. The debugger did not stop at `DataApi.link(..)` so it might not because that I haven't implemented the `DataApi.link` API. I am still debugging why it cannot apply transaction successfully (async API is a bit hard to debug) and will update when I find something interesting. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534452736



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;

Review comment:
       Good catch. The channel is a unique object of a random access file.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;
+
+    FileStoreDataChannel(RandomAccessFile file) {
+      randomAccessFile = file;
+      fileChannel = file.getChannel();
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+      fileChannel.force(metadata);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return fileChannel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return fileChannel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {

Review comment:
       Yes it is enough: https://docs.oracle.com/javase/8/docs/api/java/io/RandomAccessFile.html#close--




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-736141905


   > 1. Until this moment I am still not sure what DataApi.link(..) is supposed to do for FileStoreStateMachine. ...
   
   link(..) is to tell the StateMachine that the stream data must be persisted with the log entry.   Later, we can ask the StateMachine to handle the data for log operations such as appendEntry and truncate.  Otherwise, the StateMachine won't know whether the log entry is persisted.
   
   > 2. I updated the unit test and found that transaction cannot be applied successfully. ...
   
   Please feel free to upload the code.  I can take a look.
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r535736101



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(entry, stream);
+    return CompletableFuture.completedFuture(null);

Review comment:
       @amaliujia Check `stream.getDataChannel().isOpen()` to check stream is already closed 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533447995



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +163,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(stream, entry);

Review comment:
       link(..) is to tell the state machine which stream to use in applyTransaction(..).  In FileStore, the request header has the path so that we may use it to look up the file.  The path is the key to get the corresponding data.   link(..) in FileStore may not be very useful since we have the path in the header.
   
   Suppose we don't have the path in the header.  Then link(..) becomes very important since it can map LogEntry to data.  Otherwise, we don't know which file should be used in applyTransaction(..).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r530007065



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import java.io.IOException;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.statemachine.StateMachine;
+
+public class MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRaftClusterWithGrpc {

Review comment:
       One thing I am not sure so far is how to reuse the test code we have written in `ratis-test` in `ratis-example`. 
   
   Currently I just do a simple copy. 
   
   Should we make the ratis-eaxmple depends on ratis-test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534454817



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {

Review comment:
       Right :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533943851



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;
+
+    FileStoreDataChannel(RandomAccessFile file) {
+      randomAccessFile = file;
+      fileChannel = file.getChannel();
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+      fileChannel.force(metadata);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return fileChannel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return fileChannel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      fileChannel.close();
+      randomAccessFile.close();

Review comment:
       Before we close file, we should call force to sync disk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-733438963


   Thank you! Will update based on current comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-736130756


   I have identified the reason for failing to apply the transaction: https://github.com/apache/incubator-ratis/blob/ebabdbf570b50af0d868a5f4e5acf6fe5adc4876/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java#L94
   
   The request in STREAM header will be sent to leader, and hits this line. Because now we have allowed a customized message in that request, so need to update here as well to parse the proper proto. 
   
   ----------------
   
   actually no... the `StreamWriteRequestHeaderProto`  is added to the example proto so this function is supposed to be changed. 
   
   I am guessing that this request is not the request in stream header but still checking. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-736153313


   @szetszwo  code is updated. For the test Basically I added one more verification and then realized that transaction failed to commit: 
   ```
       DataStreamReply reply = dataStreamOutput.closeAsync().join();
       Assert.assertTrue(reply.isSuccess());
   ``` 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533952698



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient, leader);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient, RaftServerImpl leader)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+    final int size = fileLength.getSizeInt();
+    FileStoreClient client = newClient.get();

Review comment:
       try (FileStoreClient client = newClient.get()) {
   ...
   }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo edited a comment on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo edited a comment on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-735040605


    (1) sounds good.   Then, the headerContent becomes the message (currently is null) in the RaftClientRequest.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534534617



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);

Review comment:
       In fact, because now I use `Path resolve(Path relative) throws IOException` so the order of exception is no longer holds. I have merged all into catching IOException




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r535745234



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(entry, stream);
+    return CompletableFuture.completedFuture(null);

Review comment:
       ok. I ended with only checking 
   1. stream is null.
   2. stream is not closed (e.g. still open) 
   in the `link()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534555533



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -40,14 +41,18 @@
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class FileStoreStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final ConcurrentMap<LogEntryProto, DataStream> dataStreamToLogEntryMap = new ConcurrentHashMap<>();

Review comment:
       Yes, I see the discussion.  Because we have the path in the header, dataStreamToLogEntryMap  is never used, so I think we do not need it here, you can add a comment in the link method. @szetszwo What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533427701



##########
File path: ratis-proto/src/main/proto/Examples.proto
##########
@@ -26,6 +26,7 @@ message FileStoreRequestProto {
     WriteRequestHeaderProto writeHeader = 1;
     WriteRequestProto write = 2;
     DeleteRequestProto delete = 3;
+    StreamWriteRequestHeaderProto streamWriteHeader = 4;

Review comment:
       Let's rename it to "stream".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo edited a comment on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo edited a comment on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-733402768


   Consider the existing code for async:
   ```
   //FileStoreClient
     private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
         CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
         String path, long offset, boolean close, ByteBuffer data)
         throws THROWABLE {
       final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
           .setPath(ProtoUtils.toByteString(path))
           .setOffset(offset)
           .setLength(data.position())
           .setClose(close);
   
       final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
           .setHeader(header)
           .setData(ByteString.copyFrom(data));
   
       final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build();
       return sendFunction.apply(request.toByteString());
     }
   ```
   The idea is to define the streaming message format as below:
   ```
   <header-length><header><data>
   ```
   The entire message could be big, say 100MB.  We divide it in chunks, say 1MB, and send them using DataStreamOutput.writeAsync.
   
   We send a WriteRequestHeaderProto per message in Async.  In Streaming, we only need to send a header (say StreamRequestHeaderProto) once for the entire stream since the entire stream is logically a single message.  The StreamRequestHeaderProto has path and data length.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r535736101



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(entry, stream);
+    return CompletableFuture.completedFuture(null);

Review comment:
       @amaliujia Check `!stream.getDataChannel().isOpen()` to check stream is already closed 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533935050



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;

Review comment:
       We do not need fileChannel, use randomAccessFile.getChannel() directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r530010628



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +160,83 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final StateMachineDataChannel dataChannel;
+
+    LocalStream(String path) {
+      final Path root = files.getRoot();
+      final Path relative = Paths.get(path).normalize();
+      final Path full = root.resolve(relative).normalize().toAbsolutePath();
+      dataChannel = new FileStoreStateMachineDataChannel(full);

Review comment:
       I will move this part to FileStore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-736130756


   I have identified the reason for failing to apply the transaction: https://github.com/apache/incubator-ratis/blob/ebabdbf570b50af0d868a5f4e5acf6fe5adc4876/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java#L94
   
   The request in STREAM header will be sent to leader, and hits this line. Because now we have allowed a customized message in that request, so need to update here as well to parse the proper proto. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-737024322


   @szetszwo PR updated. Can you take a look? The test is not longer flaky now.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533949027



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient, leader);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient, RaftServerImpl leader)

Review comment:
       leader is useless




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534576753



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +224,56 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        final Path full = resolve(normalize(p));
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (IOException e) {
+        throw new CompletionException("Failed to commit stream write on file " + p, e);

Review comment:
       Include bytesWritten in the message.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(entry, stream);
+    return CompletableFuture.completedFuture(null);

Review comment:
       Check if the stream is already closed and if the lengths are matched.  If not, complete exceptionally.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -40,14 +41,18 @@
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class FileStoreStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final ConcurrentMap<LogEntryProto, DataStream> dataStreamToLogEntryMap = new ConcurrentHashMap<>();

Review comment:
       Let's remove this map since it is currently not useful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534451623



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);

Review comment:
       It will:
   
   `RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");` will throw FileNotFoundException, thus match the first exception.
   If the file indeed exists, `file.length();` will give IOException if anything wrong, thus hit the second exception.
   
   And exception that is matched first gets executed: https://stackoverflow.com/questions/10964882/order-of-catching-exceptions-in-java#:~:text=The%20order%20is%20whatever%20matches,is%20matched%20or%20none%20are.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533933283



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);

Review comment:
       If IOException happens, we can not make sure the reason is "Failed to get length of "




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533935050



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;

Review comment:
       We do not need fileChannel, use file.getChannel() directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-736082327


   @szetszwo sorry for a bit delay on this PR.
   
   1. Until this moment I am still not sure what `DataApi.link(..)` is supposed to do for `FileStoreStateMachine`. Any hint for it? Given a DataStream and a LogEntryProto, what's the required operation to link these two objects?
   
   2. I updated the unit test and found that transaction cannot be applied successfully. The debugger did not stop at `DataApi.link(..)` so it might not because that I haven't implemented the `DataApi.link` API. I am still debugging why it cannot apply transaction successfully and will update when I find something interesting. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533936229



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -40,14 +41,18 @@
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class FileStoreStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final ConcurrentMap<LogEntryProto, DataStream> dataStreamToLogEntryMap = new ConcurrentHashMap<>();

Review comment:
       Seems we only put entry in dataStreamToLogEntryMap, but never used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-733295325


   The basic idea so far is FileStore should initialize a DataStream by a customized header request, which specifies a file path.
   
   Then on FileStateMachine side, based on that header request, it will initialize a channel based on the file path. 
   
   Then stream writes go in. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-735052536


   > Meanwhile, I found that the unit test I added is flaky, I am still working on understanding the cause of the flakyness.
   
   It need to implement DataApi.link(..).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533943851



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;
+
+    FileStoreDataChannel(RandomAccessFile file) {
+      randomAccessFile = file;
+      fileChannel = file.getChannel();
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+      fileChannel.force(metadata);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return fileChannel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return fileChannel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      fileChannel.close();
+      randomAccessFile.close();

Review comment:
       Before we close file, we should call force to sync disk. @szetszwo What do you think ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533935393



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;
+
+    FileStoreDataChannel(RandomAccessFile file) {
+      randomAccessFile = file;
+      fileChannel = file.getChannel();
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+      fileChannel.force(metadata);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return fileChannel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return fileChannel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {

Review comment:
       Is randomAccessFile.close() enough ? not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-733402768


   Consider the existing code for async:
   ```
   //FileStoreClient
     private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
         CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
         String path, long offset, boolean close, ByteBuffer data)
         throws THROWABLE {
       final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
           .setPath(ProtoUtils.toByteString(path))
           .setOffset(offset)
           .setLength(data.position())
           .setClose(close);
   
       final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
           .setHeader(header)
           .setData(ByteString.copyFrom(data));
   
       final FileStoreRequestProto request = FileStoreRequestProto.newBuilder().setWrite(write).build();
       return sendFunction.apply(request.toByteString());
     }
   ```
   The idea is to define the streaming message format as below:
   ```
   <header-length><header><data-length><data>
   ```
   The entire message could be big, say 100MB.  We divide it in chunks, say 1MB, and send them using DataStreamOutput.writeAsync.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533932418



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {

Review comment:
       looks duplicated with this two methods.
   ```
     static Path normalize(String path) {
       Objects.requireNonNull(path, "path == null");
       return Paths.get(path).normalize();
     }
   
     Path resolve(Path relative) throws IOException {
       final Path root = getRoot();
       final Path full = root.resolve(relative).normalize().toAbsolutePath();
       if (full.equals(root)) {
         throw new IOException("The file path " + relative + " resolved to " + full
             + " is the root directory " + root);
       } else if (!full.startsWith(root)) {
         throw new IOException("The file path " + relative + " resolved to " + full
             + " is not a sub-path under root directory " + root);
       }
       return full;
     }
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {

Review comment:
       seems duplicated with this two methods.
   ```
     static Path normalize(String path) {
       Objects.requireNonNull(path, "path == null");
       return Paths.get(path).normalize();
     }
   
     Path resolve(Path relative) throws IOException {
       final Path root = getRoot();
       final Path full = root.resolve(relative).normalize().toAbsolutePath();
       if (full.equals(root)) {
         throw new IOException("The file path " + relative + " resolved to " + full
             + " is the root directory " + root);
       } else if (!full.startsWith(root)) {
         throw new IOException("The file path " + relative + " resolved to " + full
             + " is not a sub-path under root directory " + root);
       }
       return full;
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r530033067



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+  static final int MODULUS = 23;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient, leader);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient, RaftServerImpl leader)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+    final int size = fileLength.getSizeInt();
+    FileStoreClient client = newClient.get();
+    DataStreamOutput dataStreamOutput = client.getStreamOutput(path);
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    final List<Integer> sizes = new ArrayList<>();
+
+    for(int offset = 0; offset < size; ) {
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, bufferSize);
+      final boolean close = length == remaining;
+
+      LOG.trace("write {}, offset={}, length={}, close? {}",
+          path, offset, length, close);
+      final ByteBuffer bf = initBuffer(0, length);
+      futures.add(dataStreamOutput.writeAsync(bf, close));
+      sizes.add(length);
+      offset += length;
+    }
+
+    // TODO: handle when any of the writeAsync has failed.
+    // check writeAsync requests
+    for (int i = 0; i < futures.size(); i++) {
+      final DataStreamReply reply = futures.get(i).join();
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
+      Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : DataStreamPacketHeaderProto.Type.STREAM_DATA);
+    }
+
+    dataStreamOutput.closeAsync().join();
+  }
+
+  static ByteBuffer initBuffer(int offset, int size) {

Review comment:
       I think this test is mainly for testing performance, if so, we should create file with large size first, for example 1GB, then use MapByteBuffer to read file and writeAsync(MapByteBuffer). allocateDirect and fill the DirectByteBuffer is not the actual scene we use in ozone.  @szetszwo  what do you think ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533782410



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +163,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(stream, entry);

Review comment:
       I see. 
   
   And also probably for `write`, for the streaming case it should does a no-op (it was implemented by this way already which is nice)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533943851



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +225,67 @@ public void close() {
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long bytesWritten) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+        long len = file.length();
+        return StreamWriteReplyProto.newBuilder().setIsSuccess(len == bytesWritten).setByteWritten(len).build();
+      } catch (FileNotFoundException e) {
+        throw new CompletionException("Failed to find " + p, e);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to get length of " + p, e);
+      }
+    });
+  }
+
+  public CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
+    return CompletableFuture.supplyAsync(() -> {
+      final Path full = buildFullPath(p);
+      try {
+        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create " + p, e);
+      }
+    }, writer);
+  }
+
+  private Path buildFullPath(String relative) {
+    final Path root = getRoot();
+    final Path relativePath = Paths.get(relative).normalize();
+    return root.resolve(relativePath).normalize().toAbsolutePath();
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final RandomAccessFile randomAccessFile;
+    private final FileChannel fileChannel;
+
+    FileStoreDataChannel(RandomAccessFile file) {
+      randomAccessFile = file;
+      fileChannel = file.getChannel();
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+      fileChannel.force(metadata);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return fileChannel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return fileChannel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      fileChannel.close();
+      randomAccessFile.close();

Review comment:
       Before we close file, we should call force to sync disk. @szetszwo What do you think ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534438075



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -40,14 +41,18 @@
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class FileStoreStateMachine extends BaseStateMachine {
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final ConcurrentMap<LogEntryProto, DataStream> dataStreamToLogEntryMap = new ConcurrentHashMap<>();

Review comment:
       See discussion in https://github.com/apache/incubator-ratis/pull/298#discussion_r533447995.
   
   Basically I am not sure whether my current implementation is enough. If so I can remove the link() and this data structure. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-735045358


   @szetszwo  PR updated. Can you take a look? I am sure this PR will need more updates :)
   
   Meanwhile, I found that the unit test I added is flaky, I am still working on understanding the cause of the flakyness.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r531966544



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -37,4 +39,7 @@
 public interface DataStreamApi {
   /** Create a stream to write data. */
   DataStreamOutput stream();
+
+  /** Create a stream by providing a customized header message. */
+  DataStreamOutput stream(ByteBuffer headerContent);

Review comment:
       Let's call the parameter headerMessage.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;

Review comment:
       Change them to RandomAccessFile and FileChannel.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;
+    FileStoreDataChannel(Path path) {
+      try {
+        this.out = FileUtils.createNewFile(path);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.channel = Channels.newChannel(out);
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      return channel.write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+      return channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();

Review comment:
       Close the RandomAccessFile.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +154,46 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(String path) {

Review comment:
       Just pass DataChannel.

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -184,6 +186,14 @@ public DataStreamOutputRpc stream(RaftClientRequest request) {
     return new DataStreamOutputImpl(request);
   }
 
+  @Override
+  public DataStreamOutputRpc stream(ByteBuffer headerContent) {
+    ByteString payload = ByteString.copyFrom(headerContent);

Review comment:
       Use Optional to support null,
   ```
       final Message m = Optional.ofNullable(headerContent).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;
+    FileStoreDataChannel(Path path) {
+      try {
+        this.out = FileUtils.createNewFile(path);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.channel = Channels.newChannel(out);
+    }
+
+    @Override
+    public void force(boolean metadata) throws IOException {
+

Review comment:
       Call FileChannel.force(..)

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }

Review comment:
       Make it async
   ```
     CompletableFuture<FileStoreDataChannel> createDataChannel(String p) {
       return CompletableFuture.supplyAsync(() -> {
         final Path root = getRoot();
         final Path relative = Paths.get(p).normalize();
         final Path full = root.resolve(relative).normalize().toAbsolutePath();
         try {
           return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "w"));
         } catch (IOException e) {
           throw new CompletionException("Failed to create " + p, e);
         }
       }, writer);
     }
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+
   @Override
   public void close() {
     writer.shutdownNow();
     committer.shutdownNow();
     reader.shutdownNow();
     deleter.shutdownNow();
   }
+
+  public FileStoreDataChannel createDataChannel(String p) {
+    final Path root = getRoot();
+    final Path relative = Paths.get(p).normalize();
+    final Path full = root.resolve(relative).normalize().toAbsolutePath();
+    return new FileStoreDataChannel(full);
+  }
+
+  class FileStoreDataChannel implements StateMachine.DataChannel {
+    private OutputStream out;
+    private final WritableByteChannel channel;
+    FileStoreDataChannel(Path path) {
+      try {
+        this.out = FileUtils.createNewFile(path);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.channel = Channels.newChannel(out);
+    }

Review comment:
       Pass RandomAccessFile
   ```
       FileStoreDataChannel(RandomAccessFile raf) {
         this.raf = raf;
         this.channel = raf.getChannel();
       }
   ```

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -212,11 +217,52 @@ Path resolve(Path relative) throws IOException {
         : uc.submitWrite(offset, data, close, writer, getId(), index);
   }
 
+

Review comment:
       Review you own pull request and revert changes like this.

##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +154,46 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(String path) {
+      dataChannel = files.createDataChannel(path);
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return CompletableFuture.supplyAsync(() -> {
+      return new LocalStream(proto.getPath().toStringUtf8());
+    });

Review comment:
       Change it to
   ```
       return files.createDataChannel(proto.getPath().toStringUtf8())
           .thenApply(LocalStream::new);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533951946



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient, leader);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient, RaftServerImpl leader)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+    final int size = fileLength.getSizeInt();
+    FileStoreClient client = newClient.get();
+    DataStreamOutput dataStreamOutput = client.getStreamOutput(path, size);
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    final List<Integer> sizes = new ArrayList<>();
+
+    for(int offset = 0; offset < size; ) {
+      final int remaining = size - offset;
+      final int length = Math.min(remaining, bufferSize);
+      final boolean close = length == remaining;
+
+      LOG.trace("write {}, offset={}, length={}, close? {}",
+          path, offset, length, close);
+      final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
+      futures.add(dataStreamOutput.writeAsync(bf, close));
+      sizes.add(length);
+      offset += length;
+    }
+

Review comment:
       put the following code here.
   
   ```
       DataStreamReply reply = dataStreamOutput.closeAsync().join();
       Assert.assertTrue(reply.isSuccess());
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534596643



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+    final int size = fileLength.getSizeInt();
+    final DataStreamOutput dataStreamOutput;
+    try (FileStoreClient client = newClient.get()) {

Review comment:
       ```
   try (FileStoreClient client = newClient.get()) {
     final DataStreamOutput = client.getStreamOutput(path, size);
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
       final List<Integer> sizes = new ArrayList<>();
   
       for(int offset = 0; offset < size; ) {
         final int remaining = size - offset;
         final int length = Math.min(remaining, bufferSize);
         final boolean close = length == remaining;
   
         LOG.trace("write {}, offset={}, length={}, close? {}",
             path, offset, length, close);
         final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
         futures.add(dataStreamOutput.writeAsync(bf, close));
         sizes.add(length);
         offset += length;
       }
   
       DataStreamReply reply = dataStreamOutput.closeAsync().join();
       Assert.assertTrue(reply.isSuccess());
   
       // TODO: handle when any of the writeAsync has failed.
       // check writeAsync requests
       for (int i = 0; i < futures.size(); i++) {
         reply = futures.get(i).join();
         Assert.assertTrue(reply.isSuccess());
         Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
         Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : DataStreamPacketHeaderProto.Type.STREAM_DATA);
       }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533009549



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +163,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(stream, entry);

Review comment:
       My current understanding is link is just to preserve the mapping between DataStream and LogEntryProto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534596643



##########
File path: ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreStreamingBaseTest.java
##########
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.filestore;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class FileStoreStreamingBaseTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStreamingBaseTest.class);
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        FileStoreStateMachine.class, StateMachine.class);
+    ConfUtils.setFile(p::setFile, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        new File(getClassTestDir(), "filestore"));
+  }
+
+  static final int NUM_PEERS = 3;
+
+  @Test
+  public void testFileStoreStream() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_PEERS);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_PEERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    final CheckedSupplier<FileStoreClient, IOException> newClient =
+        () -> new FileStoreClient(cluster.getGroup(), getProperties(), raftPeer);
+    // TODO: configurable buffer size
+    final int bufferSize = 10_000;
+    testSingleFile("foo", SizeInBytes.valueOf("2M"), bufferSize, newClient);
+
+    cluster.shutdown();
+  }
+
+  private void testSingleFile(
+      String path, SizeInBytes fileLength, int bufferSize, CheckedSupplier<FileStoreClient,
+      IOException> newClient)
+      throws Exception {
+    LOG.info("runTestSingleFile with path={}, fileLength={}", path, fileLength);
+    final int size = fileLength.getSizeInt();
+    final DataStreamOutput dataStreamOutput;
+    try (FileStoreClient client = newClient.get()) {

Review comment:
       `try` should contains all related code. 
   ```
   try (FileStoreClient client = newClient.get()) {
     final DataStreamOutput = client.getStreamOutput(path, size);
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
       final List<Integer> sizes = new ArrayList<>();
   
       for(int offset = 0; offset < size; ) {
         final int remaining = size - offset;
         final int length = Math.min(remaining, bufferSize);
         final boolean close = length == remaining;
   
         LOG.trace("write {}, offset={}, length={}, close? {}",
             path, offset, length, close);
         final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
         futures.add(dataStreamOutput.writeAsync(bf, close));
         sizes.add(length);
         offset += length;
       }
   
       DataStreamReply reply = dataStreamOutput.closeAsync().join();
       Assert.assertTrue(reply.isSuccess());
   
       // TODO: handle when any of the writeAsync has failed.
       // check writeAsync requests
       for (int i = 0; i < futures.size(); i++) {
         reply = futures.get(i).join();
         Assert.assertTrue(reply.isSuccess());
         Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
         Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : DataStreamPacketHeaderProto.Type.STREAM_DATA);
       }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #298: RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534668692



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(entry, stream);
+    return CompletableFuture.completedFuture(null);

Review comment:
       I have did similar thing in `applyTransaction` (check whether the file exists, whether the file has the right length).
   
   Do you think that is not enough? 
   
   if so, in order to check whether this stream closes, and check the length, I think we need to update the interface 
   
   ```
     /**
      * For streaming state machine data.
      */
     interface DataStream {
       /** @return a channel for streaming state machine data. */
       DataChannel getDataChannel();
   
       /**
        * Clean up asynchronously this stream.
        *
        * When there is an error, this method is invoked to clean up the associated resources.
        * If this stream is not yet linked (see {@link DataApi#link(DataStream, LogEntryProto)}),
        * the state machine may choose to remove the data or to keep the data internally for future recovery.
        * If this stream is already linked, the data must not be removed.
        *
        * @return a future for the cleanup task.
        */
       CompletableFuture<?> cleanUp();
     }
   ```
   to   expose those information. 
   
   The least I can do with current interface is to check whether the channel in the data stream has closed.
   
   @szetszwo  what do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #298: [WIP] RATIS-1111. Change the FileStore example to use Streaming

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#issuecomment-735040605


   # 1 sounds good.   Then, the headerContent becomes the message (currently is null) in the RaftClientRequest.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org