You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/08/06 07:56:12 UTC

[GitHub] [ozone] szetszwo commented on a change in pull request #2495: HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api

szetszwo commented on a change in pull request #2495:
URL: https://github.com/apache/ozone/pull/2495#discussion_r684004522



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DataStreamOutput.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+* This interface is for writing an output stream of ByteBuffers.
+* An DataStreamOutput accepts netty.buffer.ByteBuf and sends them to some sink.
+*/
+public interface DataStreamOutput extends Closeable {

Review comment:
       Let's call this ByteBufferStreamOutput to avoid the name conflict.

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
##########
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * An {@link OutputStream} used by the REST service in combination with the
+ * SCMClient to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+public class BlockDataStreamOutput extends OutputStream
+    implements org.apache.hadoop.hdds.scm.storage.DataStreamOutput {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockDataStreamOutput.class);
+  public static final String EXCEPTION_MSG =
+      "Unexpected Storage Container Exception: ";
+  private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {};
+
+  private AtomicReference<BlockID> blockID;
+
+  private final BlockData.Builder containerBlockData;
+  private XceiverClientFactory xceiverClientFactory;
+  private XceiverClientRatis xceiverClient;
+  private OzoneClientConfig config;
+
+  private int chunkIndex;
+  private final AtomicLong chunkOffset = new AtomicLong();
+  private final BufferPool bufferPool;

Review comment:
       For zero buffer copying, BufferPool and ChunkBuffer should not be used anymore.  We get he ByteBuf from the API and then pass it down directly.

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
##########
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * An {@link OutputStream} used by the REST service in combination with the
+ * SCMClient to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+public class BlockDataStreamOutput extends OutputStream

Review comment:
       Let's do not extend OutputStream in here so that we can simply the code.  We may add it later if there is a need.

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DataStreamOutput.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import io.netty.buffer.ByteBuf;

Review comment:
       Let use ByteBuffer instead of ByteBuf here since this is a client API.  We don't want to force client to use ByteBuf.
   For our internal code, we may use ByteBuf for its efficiency.

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
##########
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * An {@link OutputStream} used by the REST service in combination with the
+ * SCMClient to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+public class BlockDataStreamOutput extends OutputStream
+    implements org.apache.hadoop.hdds.scm.storage.DataStreamOutput {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockDataStreamOutput.class);
+  public static final String EXCEPTION_MSG =
+      "Unexpected Storage Container Exception: ";
+  private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {};
+
+  private AtomicReference<BlockID> blockID;
+
+  private final BlockData.Builder containerBlockData;
+  private XceiverClientFactory xceiverClientFactory;
+  private XceiverClientRatis xceiverClient;
+  private OzoneClientConfig config;
+
+  private int chunkIndex;
+  private final AtomicLong chunkOffset = new AtomicLong();
+  private final BufferPool bufferPool;
+  // The IOException will be set by response handling thread in case there is an
+  // exception received in the response. If the exception is set, the next
+  // request will fail upfront.
+  private final AtomicReference<IOException> ioException;
+  private final ExecutorService responseExecutor;
+
+  // the effective length of data flushed so far
+  private long totalDataFlushedLength;
+
+  // effective data write attempted so far for the block
+  private long writtenDataLength;
+
+  // List containing buffers for which the putBlock call will
+  // update the length in the datanodes. This list will just maintain
+  // references to the buffers in the BufferPool which will be cleared
+  // when the watchForCommit acknowledges a putBlock logIndex has been
+  // committed on all datanodes. This list will be a  place holder for buffers
+  // which got written between successive putBlock calls.
+  private List<ChunkBuffer> bufferList;
+
+  // This object will maintain the commitIndexes and byteBufferList in order
+  // Also, corresponding to the logIndex, the corresponding list of buffers will
+  // be released from the buffer pool.
+  private final CommitWatcher commitWatcher;
+
+  private final List<DatanodeDetails> failedServers;
+  private final Checksum checksum;
+
+  //number of buffers used before doing a flush/putBlock.
+  private int flushPeriod;
+  //bytes remaining to write in the current buffer.
+  private int currentBufferRemaining;
+  //current buffer allocated to write
+  private ChunkBuffer currentBuffer;
+  private final Token<? extends TokenIdentifier> token;
+  private final DataStreamOutput out;
+  private CompletableFuture<DataStreamReply> dataStreamCloseReply;
+  private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+  private final long syncSize = 0; // TODO: disk sync is disabled for now
+  private long syncPosition = 0;
+
+  /**
+   * Creates a new BlockOutputStream.
+   *
+   * @param blockID              block ID
+   * @param xceiverClientManager client manager that controls client
+   * @param pipeline             pipeline where block will be written
+   * @param bufferPool           pool of buffers
+   */
+  public BlockDataStreamOutput(
+      BlockID blockID,
+      XceiverClientFactory xceiverClientManager,
+      Pipeline pipeline,
+      BufferPool bufferPool,
+      OzoneClientConfig config,
+      Token<? extends TokenIdentifier> token
+  ) throws IOException {
+    this.xceiverClientFactory = xceiverClientManager;
+    this.config = config;
+    this.blockID = new AtomicReference<>(blockID);
+    KeyValue keyValue =
+        KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
+    this.containerBlockData =
+        BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
+            .addMetadata(keyValue);
+    this.xceiverClient =
+        (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
+    // Alternatively, stream setup can be delayed till the first chunk write.
+    this.out = setupStream();
+    this.bufferPool = bufferPool;
+    this.token = token;
+
+    //number of buffers used before doing a flush
+    refreshCurrentBuffer(bufferPool);
+    flushPeriod = (int) (config.getStreamBufferFlushSize() / config
+        .getStreamBufferSize());
+
+    Preconditions
+        .checkArgument(
+            (long) flushPeriod * config.getStreamBufferSize() == config
+                .getStreamBufferFlushSize());
+
+    // A single thread executor handle the responses of async requests
+    responseExecutor = Executors.newSingleThreadExecutor();
+    commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
+    bufferList = null;
+    totalDataFlushedLength = 0;
+    writtenDataLength = 0;
+    failedServers = new ArrayList<>(0);
+    ioException = new AtomicReference<>(null);
+    checksum = new Checksum(config.getChecksumType(),
+        config.getBytesPerChecksum());
+  }
+
+  private DataStreamOutput setupStream() throws IOException {
+    // Execute a dummy WriteChunk request to get the path of the target file,
+    // but does NOT write any data to it.
+    ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
+        ContainerProtos.WriteChunkRequestProto.newBuilder()
+            .setBlockID(blockID.get().getDatanodeBlockIDProtobuf());
+
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerProtos.ContainerCommandRequestProto.Builder builder =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.StreamInit)
+            .setContainerID(blockID.get().getContainerID())
+            .setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
+
+    ContainerCommandRequestMessage message =
+        ContainerCommandRequestMessage.toMessage(builder.build(), null);
+
+    return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
+        .stream(message.getContent().asReadOnlyByteBuffer());
+  }
+
+  private void refreshCurrentBuffer(BufferPool pool) {
+    currentBuffer = pool.getCurrentBuffer();
+    currentBufferRemaining =
+        currentBuffer != null ? currentBuffer.remaining() : 0;
+  }
+
+  public BlockID getBlockID() {
+    return blockID.get();
+  }
+
+  public long getTotalAckDataLength() {
+    return commitWatcher.getTotalAckDataLength();
+  }
+
+  public long getWrittenDataLength() {
+    return writtenDataLength;
+  }
+
+  public List<DatanodeDetails> getFailedServers() {
+    return failedServers;
+  }
+
+  @VisibleForTesting
+  public XceiverClientRatis getXceiverClient() {
+    return xceiverClient;
+  }
+
+  @VisibleForTesting
+  public long getTotalDataFlushedLength() {
+    return totalDataFlushedLength;
+  }
+
+  @VisibleForTesting
+  public BufferPool getBufferPool() {
+    return bufferPool;
+  }
+
+  public IOException getIoException() {
+    return ioException.get();
+  }
+
+  @VisibleForTesting
+  public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
+    return commitWatcher.getCommitIndex2flushedDataMap();
+  }
+
+  @Override
+  public void write(ByteBuf b) throws IOException {
+    checkOpen();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    int off = 0;

Review comment:
       It should be
   ```
       int off = b.readerIndex();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org