You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:06 UTC

[ozone] 01/35: HDDS-5366. [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358). Contributed by mingchao zhao

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 553346e7d22137cd3646c930a4d7984158122af3
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jun 23 23:20:27 2021 +0800

    HDDS-5366.  [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358).  Contributed by mingchao zhao
---
 .../server/ratis/ContainerStateMachine.java        | 25 ++++++++++
 .../common/transport/server/ratis/LocalStream.java | 50 +++++++++++++++++++
 .../transport/server/ratis/StreamDataChannel.java  | 57 ++++++++++++++++++++++
 3 files changed, 132 insertions(+)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 494cfe1b13..11095dcd7f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -25,10 +25,12 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collection;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -510,6 +512,29 @@ public class ContainerStateMachine extends BaseStateMachine {
     return raftFuture;
   }
 
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        ContainerCommandRequestProto requestProto =
+            getContainerCommandRequestProto(gid,
+                request.getMessage().getContent());
+        DispatcherContext context =
+            new DispatcherContext.Builder()
+                .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+                .setContainer2BCSIDMap(container2BCSIDMap)
+                .build();
+
+        ContainerCommandResponseProto response = runCommand(
+            requestProto, context);
+        String path = response.getMessage();
+        return new LocalStream(new StreamDataChannel(Paths.get(path)));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create data stream", e);
+      }
+    }, executor);
+  }
+
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
     int i = (int)(req.getBlockID().getLocalID() % chunkExecutors.size());
     return chunkExecutors.get(i);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
new file mode 100644
index 0000000000..baae013966
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+class LocalStream implements StateMachine.DataStream {
+  private final StateMachine.DataChannel dataChannel;
+
+  LocalStream(StateMachine.DataChannel dataChannel) {
+    this.dataChannel = dataChannel;
+  }
+
+  @Override
+  public StateMachine.DataChannel getDataChannel() {
+    return dataChannel;
+  }
+
+  @Override
+  public CompletableFuture<?> cleanUp() {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        dataChannel.close();
+        return true;
+      } catch (IOException e) {
+        throw new CompletionException("Failed to close data channel", e);
+      }
+    });
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
new file mode 100644
index 0000000000..3df66e26dc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+class StreamDataChannel implements StateMachine.DataChannel {
+  private final Path path;
+  private final RandomAccessFile randomAccessFile;
+
+  StreamDataChannel(Path path) throws FileNotFoundException {
+    this.path = path;
+    this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
+  }
+
+  @Override
+  public void force(boolean metadata) throws IOException {
+    randomAccessFile.getChannel().force(metadata);
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    return randomAccessFile.getChannel().write(src);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return randomAccessFile.getChannel().isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    randomAccessFile.close();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org