You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/11/07 18:11:58 UTC
[ozone] 05/40: HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api (#2495)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit e6be97e0c68d99752258bee9b4e5057ee944e06a
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Thu Aug 12 18:09:38 2021 +0800
HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api (#2495)
(cherry picked from commit d33d9cb02184a77a170b0890dc5fbb6465b3b0e8)
(cherry picked from commit 18db09d7d4e20f020b9fe2a73d9daac97b2ce51c)
(cherry picked from commit 0952195d73d6512b4c053ffc41723ec76e7289f9)
---
hadoop-hdds/client/pom.xml | 4 +
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 5 +
.../hdds/scm/storage/BlockDataStreamOutput.java | 760 +++++++++++++++++++++
.../hdds/scm/storage/ByteBufStreamOutput.java | 58 ++
.../apache/hadoop/ozone/client/OzoneBucket.java | 19 +
.../client/io/BlockDataStreamOutputEntry.java | 294 ++++++++
.../client/io/BlockDataStreamOutputEntryPool.java | 324 +++++++++
.../ozone/client/io/KeyDataStreamOutput.java | 629 +++++++++++++++++
.../ozone/client/io/OzoneDataStreamOutput.java | 70 ++
.../ozone/client/protocol/ClientProtocol.java | 15 +
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 62 ++
.../client/rpc/TestBlockDataStreamOutput.java | 181 +++++
.../apache/hadoop/ozone/container/TestHelper.java | 22 +-
.../hadoop/ozone/shell/keys/PutKeyHandler.java | 40 +-
14 files changed, 2477 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 3e3ff0cb9f..2e5bb745fa 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -74,6 +74,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index d0fd0db129..8e4ab16ee8 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdds.tracing.TracingUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
@@ -382,4 +383,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
throw new UnsupportedOperationException(
"Operation Not supported for ratis client");
}
+
+ public DataStreamApi getDataStreamApi() {
+ return this.getClient().getDataStreamApi();
+ }
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
new file mode 100644
index 0000000000..f658df1af9
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * An {@link ByteBufStreamOutput} used by the REST service in combination
+ * with the SCMClient to write the value of a key to a sequence
+ * of container chunks. Writes are buffered locally and periodically written to
+ * the container as a new chunk. In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier. This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data. The list of chunks is updated all at once. Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+public class BlockDataStreamOutput implements ByteBufStreamOutput {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockDataStreamOutput.class);
+ public static final String EXCEPTION_MSG =
+ "Unexpected Storage Container Exception: ";
+ private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {};
+
+ private AtomicReference<BlockID> blockID;
+
+ private final BlockData.Builder containerBlockData;
+ private XceiverClientFactory xceiverClientFactory;
+ private XceiverClientRatis xceiverClient;
+ private OzoneClientConfig config;
+
+ private int chunkIndex;
+ private final AtomicLong chunkOffset = new AtomicLong();
+ private final BufferPool bufferPool;
+ // The IOException will be set by response handling thread in case there is an
+ // exception received in the response. If the exception is set, the next
+ // request will fail upfront.
+ private final AtomicReference<IOException> ioException;
+ private final ExecutorService responseExecutor;
+
+ // the effective length of data flushed so far
+ private long totalDataFlushedLength;
+
+ // effective data write attempted so far for the block
+ private long writtenDataLength;
+
+ // List containing buffers for which the putBlock call will
+ // update the length in the datanodes. This list will just maintain
+ // references to the buffers in the BufferPool which will be cleared
+ // when the watchForCommit acknowledges a putBlock logIndex has been
+ // committed on all datanodes. This list will be a place holder for buffers
+ // which got written between successive putBlock calls.
+ private List<ChunkBuffer> bufferList;
+
+ // This object will maintain the commitIndexes and byteBufferList in order
+ // Also, corresponding to the logIndex, the corresponding list of buffers will
+ // be released from the buffer pool.
+ private final CommitWatcher commitWatcher;
+
+ private final List<DatanodeDetails> failedServers;
+ private final Checksum checksum;
+
+ //number of buffers used before doing a flush/putBlock.
+ private int flushPeriod;
+ //bytes remaining to write in the current buffer.
+ private int currentBufferRemaining;
+ //current buffer allocated to write
+ private ChunkBuffer currentBuffer;
+ private final Token<? extends TokenIdentifier> token;
+ private final DataStreamOutput out;
+ private CompletableFuture<DataStreamReply> dataStreamCloseReply;
+ private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ private final long syncSize = 0; // TODO: disk sync is disabled for now
+ private long syncPosition = 0;
+
+ /**
+ * Creates a new BlockDataStreamOutput.
+ *
+ * @param blockID block ID
+ * @param xceiverClientManager client manager that controls client
+ * @param pipeline pipeline where block will be written
+ * @param bufferPool pool of buffers
+ */
+ public BlockDataStreamOutput(
+ BlockID blockID,
+ XceiverClientFactory xceiverClientManager,
+ Pipeline pipeline,
+ BufferPool bufferPool,
+ OzoneClientConfig config,
+ Token<? extends TokenIdentifier> token
+ ) throws IOException {
+ this.xceiverClientFactory = xceiverClientManager;
+ this.config = config;
+ this.blockID = new AtomicReference<>(blockID);
+ KeyValue keyValue =
+ KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
+ this.containerBlockData =
+ BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
+ .addMetadata(keyValue);
+ this.xceiverClient =
+ (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
+ // Alternatively, stream setup can be delayed till the first chunk write.
+ this.out = setupStream();
+ this.bufferPool = bufferPool;
+ this.token = token;
+
+ //number of buffers used before doing a flush
+ refreshCurrentBuffer(bufferPool);
+ flushPeriod = (int) (config.getStreamBufferFlushSize() / config
+ .getStreamBufferSize());
+
+ Preconditions
+ .checkArgument(
+ (long) flushPeriod * config.getStreamBufferSize() == config
+ .getStreamBufferFlushSize());
+
+ // A single thread executor handle the responses of async requests
+ responseExecutor = Executors.newSingleThreadExecutor();
+ commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
+ bufferList = null;
+ totalDataFlushedLength = 0;
+ writtenDataLength = 0;
+ failedServers = new ArrayList<>(0);
+ ioException = new AtomicReference<>(null);
+ checksum = new Checksum(config.getChecksumType(),
+ config.getBytesPerChecksum());
+ }
+
+ private DataStreamOutput setupStream() throws IOException {
+ // Execute a dummy WriteChunk request to get the path of the target file,
+ // but does NOT write any data to it.
+ ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
+ ContainerProtos.WriteChunkRequestProto.newBuilder()
+ .setBlockID(blockID.get().getDatanodeBlockIDProtobuf());
+
+ String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+ ContainerProtos.ContainerCommandRequestProto.Builder builder =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.StreamInit)
+ .setContainerID(blockID.get().getContainerID())
+ .setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
+
+ ContainerCommandRequestMessage message =
+ ContainerCommandRequestMessage.toMessage(builder.build(), null);
+
+ return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
+ .stream(message.getContent().asReadOnlyByteBuffer());
+ }
+
+ private void refreshCurrentBuffer(BufferPool pool) {
+ currentBuffer = pool.getCurrentBuffer();
+ currentBufferRemaining =
+ currentBuffer != null ? currentBuffer.remaining() : 0;
+ }
+
+ public BlockID getBlockID() {
+ return blockID.get();
+ }
+
+ public long getTotalAckDataLength() {
+ return commitWatcher.getTotalAckDataLength();
+ }
+
+ public long getWrittenDataLength() {
+ return writtenDataLength;
+ }
+
+ public List<DatanodeDetails> getFailedServers() {
+ return failedServers;
+ }
+
+ @VisibleForTesting
+ public XceiverClientRatis getXceiverClient() {
+ return xceiverClient;
+ }
+
+ @VisibleForTesting
+ public long getTotalDataFlushedLength() {
+ return totalDataFlushedLength;
+ }
+
+ @VisibleForTesting
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
+ public IOException getIoException() {
+ return ioException.get();
+ }
+
+ @VisibleForTesting
+ public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
+ return commitWatcher.getCommitIndex2flushedDataMap();
+ }
+
+ @Override
+ public void write(ByteBuf b) throws IOException {
+ checkOpen();
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ int off = b.readerIndex();
+ int len = b.readableBytes();
+
+ while (len > 0) {
+ allocateNewBufferIfNeeded();
+ final int writeLen = Math.min(currentBufferRemaining, len);
+ // TODO: avoid buffer copy here
+ currentBuffer.put(b.nioBuffer(off, writeLen));
+ currentBufferRemaining -= writeLen;
+ writeChunkIfNeeded();
+ off += writeLen;
+ len -= writeLen;
+ writtenDataLength += writeLen;
+ doFlushOrWatchIfNeeded();
+ }
+ }
+
+ private void writeChunkIfNeeded() throws IOException {
+ if (currentBufferRemaining == 0) {
+ writeChunk(currentBuffer);
+ }
+ }
+
+ private void doFlushOrWatchIfNeeded() throws IOException {
+ if (currentBufferRemaining == 0) {
+ if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
+ updateFlushLength();
+ executePutBlock(false, false);
+ }
+ // Data in the bufferPool can not exceed streamBufferMaxSize
+ if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
+ handleFullBuffer();
+ }
+ }
+ }
+
+ private void allocateNewBufferIfNeeded() {
+ if (currentBufferRemaining == 0) {
+ currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
+ currentBufferRemaining = currentBuffer.remaining();
+ }
+ }
+
+ private void updateFlushLength() {
+ totalDataFlushedLength = writtenDataLength;
+ }
+
+ private boolean isBufferPoolFull() {
+ return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
+ }
+
+ /**
+ * Will be called on the retryPath in case closedContainerException/
+ * TimeoutException.
+ * @param len length of data to write
+ * @throws IOException if error occurred
+ */
+
+ // In this case, the data is already cached in the currentBuffer.
+ public void writeOnRetry(long len) throws IOException {
+ if (len == 0) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+ }
+ Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
+ int count = 0;
+ while (len > 0) {
+ ChunkBuffer buffer = bufferPool.getBuffer(count);
+ long writeLen = Math.min(buffer.position(), len);
+ if (!buffer.hasRemaining()) {
+ writeChunk(buffer);
+ }
+ len -= writeLen;
+ count++;
+ writtenDataLength += writeLen;
+ // we should not call isBufferFull/shouldFlush here.
+ // The buffer might already be full as whole data is already cached in
+ // the buffer. We should just validate
+ // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
+ // call for handling full buffer/flush buffer condition.
+ if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
+ // reset the position to zero as now we will be reading the
+ // next buffer in the list
+ updateFlushLength();
+ executePutBlock(false, false);
+ }
+ if (writtenDataLength == config.getStreamBufferMaxSize()) {
+ handleFullBuffer();
+ }
+ }
+ }
+
+ /**
+ * This is a blocking call. It will wait for the flush till the commit index
+ * at the head of the commitIndex2flushedDataMap gets replicated to all or
+ * majority.
+ * @throws IOException
+ */
+ private void handleFullBuffer() throws IOException {
+ try {
+ checkOpen();
+ if (!commitWatcher.getFutureMap().isEmpty()) {
+ waitOnFlushFutures();
+ }
+ } catch (ExecutionException e) {
+ handleExecutionException(e);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ handleInterruptedException(ex, true);
+ }
+ watchForCommit(true);
+ }
+
+
+ // It may happen that once the exception is encountered , we still might
+ // have successfully flushed up to a certain index. Make sure the buffers
+ // only contain data which have not been sufficiently replicated
+ private void adjustBuffersOnException() {
+ commitWatcher.releaseBuffersOnException();
+ refreshCurrentBuffer(bufferPool);
+ }
+
+ /**
+ * calls watchForCommit API of the Ratis Client. For Standalone client,
+ * it is a no op.
+ * @param bufferFull flag indicating whether bufferFull condition is hit or
+ * its called as part flush/close
+ * @return minimum commit index replicated to all nodes
+ * @throws IOException IOException in case watch gets timed out
+ */
+ private void watchForCommit(boolean bufferFull) throws IOException {
+ checkOpen();
+ try {
+ XceiverClientReply reply = bufferFull ?
+ commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+ if (reply != null) {
+ List<DatanodeDetails> dnList = reply.getDatanodes();
+ if (!dnList.isEmpty()) {
+ Pipeline pipe = xceiverClient.getPipeline();
+
+ LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
+ blockID, pipe, dnList);
+ failedServers.addAll(dnList);
+ }
+ }
+ } catch (IOException ioe) {
+ setIoException(ioe);
+ throw getIoException();
+ }
+ refreshCurrentBuffer(bufferPool);
+
+ }
+
+ /**
+ * @param close whether putBlock is happening as part of closing the stream
+ * @param force true if no data was written since most recent putBlock and
+ * stream is being closed
+ */
+ private CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto> executePutBlock(boolean close,
+ boolean force) throws IOException {
+ checkOpen();
+ long flushPos = totalDataFlushedLength;
+ final List<ChunkBuffer> byteBufferList;
+ if (!force) {
+ Preconditions.checkNotNull(bufferList);
+ byteBufferList = bufferList;
+ bufferList = null;
+ Preconditions.checkNotNull(byteBufferList);
+ } else {
+ byteBufferList = null;
+ }
+
+ try {
+ CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+ } catch (Exception e) {
+ LOG.warn("Failed to write all chunks through stream: " + e);
+ throw new IOException(e);
+ }
+ if (close) {
+ dataStreamCloseReply = out.closeAsync();
+ }
+
+ CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto> flushFuture = null;
+ try {
+ BlockData blockData = containerBlockData.build();
+ XceiverClientReply asyncReply =
+ putBlockAsync(xceiverClient, blockData, close, token);
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+ asyncReply.getResponse();
+ flushFuture = future.thenApplyAsync(e -> {
+ try {
+ validateResponse(e);
+ } catch (IOException sce) {
+ throw new CompletionException(sce);
+ }
+ // if the ioException is not set, putBlock is successful
+ if (getIoException() == null && !force) {
+ BlockID responseBlockID = BlockID.getFromProtobuf(
+ e.getPutBlock().getCommittedBlockLength().getBlockID());
+ Preconditions.checkState(blockID.get().getContainerBlockID()
+ .equals(responseBlockID.getContainerBlockID()));
+ // updates the bcsId of the block
+ blockID.set(responseBlockID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ + commitWatcher.getCommitInfoMapSize() + " flushLength "
+ + flushPos + " numBuffers " + byteBufferList.size()
+ + " blockID " + blockID + " bufferPool size" + bufferPool
+ .getSize() + " currentBufferIndex " + bufferPool
+ .getCurrentBufferIndex());
+ }
+ // for standalone protocol, logIndex will always be 0.
+ commitWatcher
+ .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
+ }
+ return e;
+ }, responseExecutor).exceptionally(e -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putBlock failed for blockID {} with exception {}",
+ blockID, e.getLocalizedMessage());
+ }
+ CompletionException ce = new CompletionException(e);
+ setIoException(ce);
+ throw ce;
+ });
+ } catch (IOException | ExecutionException e) {
+ throw new IOException(EXCEPTION_MSG + e.toString(), e);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ handleInterruptedException(ex, false);
+ }
+ commitWatcher.getFutureMap().put(flushPos, flushFuture);
+ return flushFuture;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (xceiverClientFactory != null && xceiverClient != null
+ && bufferPool != null && bufferPool.getSize() > 0
+ && (!config.isStreamBufferFlushDelay() ||
+ writtenDataLength - totalDataFlushedLength
+ >= config.getStreamBufferSize())) {
+ try {
+ handleFlush(false);
+ } catch (ExecutionException e) {
+ // just set the exception here as well in order to maintain sanctity of
+ // ioException field
+ handleExecutionException(e);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ handleInterruptedException(ex, true);
+ }
+ }
+ }
+
+ private void writeChunk(ChunkBuffer buffer)
+ throws IOException {
+ // This data in the buffer will be pushed to datanode and a reference will
+ // be added to the bufferList. Once putBlock gets executed, this list will
+ // be marked null. Hence, during first writeChunk call after every putBlock
+ // call or during the first call to writeChunk here, the list will be null.
+
+ if (bufferList == null) {
+ bufferList = new ArrayList<>();
+ }
+ bufferList.add(buffer);
+ writeChunkToContainer(buffer.duplicate(0, buffer.position()));
+ }
+
+ /**
+ * @param close whether the flush is happening as part of closing the stream
+ */
+ private void handleFlush(boolean close)
+ throws IOException, InterruptedException, ExecutionException {
+ checkOpen();
+ // flush the last chunk data residing on the currentBuffer
+ if (totalDataFlushedLength < writtenDataLength) {
+ refreshCurrentBuffer(bufferPool);
+ Preconditions.checkArgument(currentBuffer.position() > 0);
+ if (currentBuffer.hasRemaining()) {
+ writeChunk(currentBuffer);
+ }
+ // This can be a partially filled chunk. Since we are flushing the buffer
+ // here, we just limit this buffer to the current position. So that next
+ // write will happen in new buffer
+ updateFlushLength();
+ executePutBlock(close, false);
+ } else if (close) {
+ // forcing an "empty" putBlock if stream is being closed without new
+ // data since latest flush - we need to send the "EOF" flag
+ executePutBlock(true, true);
+ }
+ waitOnFlushFutures();
+ watchForCommit(false);
+ // just check again if the exception is hit while waiting for the
+ // futures to ensure flush has indeed succeeded
+
+ // irrespective of whether the commitIndex2flushedDataMap is empty
+ // or not, ensure there is no exception set
+ checkOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (xceiverClientFactory != null && xceiverClient != null
+ && bufferPool != null && bufferPool.getSize() > 0) {
+ try {
+ handleFlush(true);
+ dataStreamCloseReply.get();
+ } catch (ExecutionException e) {
+ handleExecutionException(e);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ handleInterruptedException(ex, true);
+ } finally {
+ cleanup(false);
+ }
+ // TODO: Turn the below buffer empty check on when Standalone pipeline
+ // is removed in the write path in tests
+ // Preconditions.checkArgument(buffer.position() == 0);
+ // bufferPool.checkBufferPoolEmpty();
+
+ }
+ }
+
+ private void waitOnFlushFutures()
+ throws InterruptedException, ExecutionException {
+ CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+ commitWatcher.getFutureMap().values().toArray(
+ new CompletableFuture[commitWatcher.getFutureMap().size()]));
+ // wait for all the transactions to complete
+ combinedFuture.get();
+ }
+
+ private void validateResponse(
+ ContainerProtos.ContainerCommandResponseProto responseProto)
+ throws IOException {
+ try {
+ // if the ioException is already set, it means a prev request has failed
+ // just throw the exception. The current operation will fail with the
+ // original error
+ IOException exception = getIoException();
+ if (exception != null) {
+ throw exception;
+ }
+ ContainerProtocolCalls.validateContainerResponse(responseProto);
+ } catch (StorageContainerException sce) {
+ setIoException(sce);
+ throw sce;
+ }
+ }
+
+
+ private void setIoException(Exception e) {
+ IOException ioe = getIoException();
+ if (ioe == null) {
+ IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e);
+ ioException.compareAndSet(null, exception);
+ } else {
+ LOG.debug("Previous request had already failed with " + ioe.toString()
+ + " so subsequent request also encounters"
+ + " Storage Container Exception ", e);
+ }
+ }
+
+ public void cleanup(boolean invalidateClient) {
+ if (xceiverClientFactory != null) {
+ xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+ }
+ xceiverClientFactory = null;
+ xceiverClient = null;
+ commitWatcher.cleanup();
+ if (bufferList != null) {
+ bufferList.clear();
+ }
+ bufferList = null;
+ responseExecutor.shutdown();
+ }
+
+ /**
+ * Checks if the stream is open or exception has occurred.
+ * If not, throws an exception.
+ *
+ * @throws IOException if stream is closed
+ */
+ private void checkOpen() throws IOException {
+ if (isClosed()) {
+ throw new IOException("BlockDataStreamOutput has been closed.");
+ } else if (getIoException() != null) {
+ adjustBuffersOnException();
+ throw getIoException();
+ }
+ }
+
+ public boolean isClosed() {
+ return xceiverClient == null;
+ }
+
+ private boolean needSync(long position) {
+ if (syncSize > 0) {
+ // TODO: or position >= fileLength
+ if (position - syncPosition >= syncSize) {
+ syncPosition = position;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Writes buffered data as a new chunk to the container and saves chunk
+ * information to be used later in putKey call.
+ *
+ * @throws IOException if there is an I/O error while performing the call
+ * @throws OzoneChecksumException if there is an error while computing
+ * checksum
+ */
+ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+ int effectiveChunkSize = chunk.remaining();
+ final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
+ final ByteString data = chunk.toByteString(
+ bufferPool.byteStringConversion());
+ ChecksumData checksumData = checksum.computeChecksum(chunk);
+ ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+ .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
+ .setOffset(offset)
+ .setLen(effectiveChunkSize)
+ .setChecksumData(checksumData.getProtoBufMessage())
+ .build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing chunk {} length {} at offset {}",
+ chunkInfo.getChunkName(), effectiveChunkSize, offset);
+ }
+
+ CompletableFuture<DataStreamReply> future =
+ (needSync(offset + effectiveChunkSize) ?
+ out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) :
+ out.writeAsync(data.asReadOnlyByteBuffer()))
+ .whenCompleteAsync((r, e) -> {
+ if (e != null || !r.isSuccess()) {
+ if (e == null) {
+ e = new IOException("result is not success");
+ }
+ String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+ " " + "into block " + blockID;
+ LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+ CompletionException ce = new CompletionException(msg, e);
+ setIoException(ce);
+ throw ce;
+ }
+ }, responseExecutor);
+
+ futures.add(future);
+ containerBlockData.addChunks(chunkInfo);
+ }
+
+ @VisibleForTesting
+ public void setXceiverClient(XceiverClientRatis xceiverClient) {
+ this.xceiverClient = xceiverClient;
+ }
+
+ /**
+ * Handles InterruptedExecution.
+ *
+ * @param ex
+ * @param processExecutionException is optional, if passed as TRUE, then
+ * handle ExecutionException else skip it.
+ * @throws IOException
+ */
+ private void handleInterruptedException(Exception ex,
+ boolean processExecutionException)
+ throws IOException {
+ LOG.error("Command execution was interrupted.");
+ if(processExecutionException) {
+ handleExecutionException(ex);
+ } else {
+ throw new IOException(EXCEPTION_MSG + ex.toString(), ex);
+ }
+ }
+
+ /**
+ * Handles ExecutionException by adjusting buffers.
+ * @param ex
+ * @throws IOException
+ */
+ private void handleExecutionException(Exception ex) throws IOException {
+ setIoException(ex);
+ adjustBuffersOnException();
+ throw getIoException();
+ }
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
new file mode 100644
index 0000000000..7f40737b70
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+* This interface is for writing an output stream of ByteBuffers.
+* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink.
+*/
+public interface ByteBufStreamOutput extends Closeable {
+ /**
+ * Try to write all the bytes in ByteBuf b to DataStream.
+ *
+ * @param b the data.
+ * @exception IOException if an I/O error occurs.
+ */
+ void write(ByteBuf b) throws IOException;
+
+ /**
+ * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @exception IOException if an I/O error occurs.
+ */
+ default void write(ByteBuf b, int off, int len) throws IOException {
+ write(b.slice(off, len));
+ }
+
+ /**
+ * Flushes this DataStream output and forces any buffered output bytes
+ * to be written out.
+ *
+ * @exception IOException if an I/O error occurs.
+ */
+ void flush() throws IOException;
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 596afa6371..2d27858e16 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -599,6 +600,24 @@ public class OzoneBucket extends WithMetadata {
.createKey(volumeName, name, key, size, replicationConfig, keyMetadata);
}
+ /**
+ * Creates a new key in the bucket.
+ *
+ * @param key Name of the key to be created.
+ * @param size Size of the data the key will point to.
+ * @param replicationConfig Replication configuration.
+ * @return OzoneDataStreamOutput to which the data has to be written.
+ * @throws IOException
+ */
+ public OzoneDataStreamOutput createStreamKey(String key, long size,
+ ReplicationConfig replicationConfig,
+ Map<String, String> keyMetadata)
+ throws IOException {
+ return proxy
+ .createStreamKey(volumeName, name, key, size, replicationConfig,
+ keyMetadata);
+ }
+
/**
* Reads an existing key from the bucket.
*
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
new file mode 100644
index 0000000000..6954742601
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Helper class used inside {@link BlockDataStreamOutput}.
+ * */
+public final class BlockDataStreamOutputEntry
+ implements ByteBufStreamOutput {
+
+ private final OzoneClientConfig config;
+ private ByteBufStreamOutput byteBufStreamOutput;
+ private BlockID blockID;
+ private final String key;
+ private final XceiverClientFactory xceiverClientManager;
+ private final Pipeline pipeline;
+ // total number of bytes that should be written to this stream
+ private final long length;
+ // the current position of this stream 0 <= currentPosition < length
+ private long currentPosition;
+ private final Token<OzoneBlockTokenIdentifier> token;
+
+ private BufferPool bufferPool;
+
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
+ private BlockDataStreamOutputEntry(
+ BlockID blockID, String key,
+ XceiverClientFactory xceiverClientManager,
+ Pipeline pipeline,
+ long length,
+ BufferPool bufferPool,
+ Token<OzoneBlockTokenIdentifier> token,
+ OzoneClientConfig config
+ ) {
+ this.config = config;
+ this.byteBufStreamOutput = null;
+ this.blockID = blockID;
+ this.key = key;
+ this.xceiverClientManager = xceiverClientManager;
+ this.pipeline = pipeline;
+ this.token = token;
+ this.length = length;
+ this.currentPosition = 0;
+ this.bufferPool = bufferPool;
+ }
+
+ long getLength() {
+ return length;
+ }
+
+ Token<OzoneBlockTokenIdentifier> getToken() {
+ return token;
+ }
+
+ long getRemaining() {
+ return length - currentPosition;
+ }
+
+ /**
+ * BlockDataStreamOutput is initialized in this function. This makes sure that
+ * xceiverClient initialization is not done during preallocation and only
+ * done when data is written.
+ * @throws IOException if xceiverClient initialization fails
+ */
+ private void checkStream() throws IOException {
+ if (this.byteBufStreamOutput == null) {
+ this.byteBufStreamOutput =
+ new BlockDataStreamOutput(blockID, xceiverClientManager,
+ pipeline, bufferPool, config, token);
+ }
+ }
+
+ @Override
+ public void write(ByteBuf b) throws IOException {
+ checkStream();
+ final int len = b.readableBytes();
+ byteBufStreamOutput.write(b);
+ this.currentPosition += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (this.byteBufStreamOutput != null) {
+ this.byteBufStreamOutput.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.byteBufStreamOutput != null) {
+ this.byteBufStreamOutput.close();
+ // after closing the chunkOutPutStream, blockId would have been
+ // reconstructed with updated bcsId
+ this.blockID =
+ ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID();
+ }
+ }
+
+ boolean isClosed() {
+ if (byteBufStreamOutput != null) {
+ return ((BlockDataStreamOutput) byteBufStreamOutput).isClosed();
+ }
+ return false;
+ }
+
+ long getTotalAckDataLength() {
+ if (byteBufStreamOutput != null) {
+ BlockDataStreamOutput out =
+ (BlockDataStreamOutput) this.byteBufStreamOutput;
+ blockID = out.getBlockID();
+ return out.getTotalAckDataLength();
+ } else {
+ // For a pre allocated block for which no write has been initiated,
+ // the ByteBufStreamOutput will be null here.
+ // In such cases, the default blockCommitSequenceId will be 0
+ return 0;
+ }
+ }
+
+ Collection<DatanodeDetails> getFailedServers() {
+ if (byteBufStreamOutput != null) {
+ BlockDataStreamOutput out =
+ (BlockDataStreamOutput) this.byteBufStreamOutput;
+ return out.getFailedServers();
+ }
+ return Collections.emptyList();
+ }
+
+ long getWrittenDataLength() {
+ if (byteBufStreamOutput != null) {
+ BlockDataStreamOutput out =
+ (BlockDataStreamOutput) this.byteBufStreamOutput;
+ return out.getWrittenDataLength();
+ } else {
+ // For a pre allocated block for which no write has been initiated,
+ // the ByteBufStreamOutput will be null here.
+ // In such cases, the default blockCommitSequenceId will be 0
+ return 0;
+ }
+ }
+
+ void cleanup(boolean invalidateClient) throws IOException {
+ checkStream();
+ BlockDataStreamOutput out =
+ (BlockDataStreamOutput) this.byteBufStreamOutput;
+ out.cleanup(invalidateClient);
+
+ }
+
+ void writeOnRetry(long len) throws IOException {
+ checkStream();
+ BlockDataStreamOutput out =
+ (BlockDataStreamOutput) this.byteBufStreamOutput;
+ out.writeOnRetry(len);
+ this.currentPosition += len;
+
+ }
+
+ /**
+ * Builder class for BlockDataStreamOutputEntry.
+ * */
+ public static class Builder {
+
+ private BlockID blockID;
+ private String key;
+ private XceiverClientFactory xceiverClientManager;
+ private Pipeline pipeline;
+ private long length;
+ private BufferPool bufferPool;
+ private Token<OzoneBlockTokenIdentifier> token;
+ private OzoneClientConfig config;
+
+ public Builder setBlockID(BlockID bID) {
+ this.blockID = bID;
+ return this;
+ }
+
+ public Builder setKey(String keys) {
+ this.key = keys;
+ return this;
+ }
+
+ public Builder setXceiverClientManager(
+ XceiverClientFactory
+ xClientManager) {
+ this.xceiverClientManager = xClientManager;
+ return this;
+ }
+
+ public Builder setPipeline(Pipeline ppln) {
+ this.pipeline = ppln;
+ return this;
+ }
+
+
+ public Builder setLength(long len) {
+ this.length = len;
+ return this;
+ }
+
+
+ public Builder setBufferPool(BufferPool pool) {
+ this.bufferPool = pool;
+ return this;
+ }
+
+ public Builder setConfig(OzoneClientConfig clientConfig) {
+ this.config = clientConfig;
+ return this;
+ }
+
+ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
+ this.token = bToken;
+ return this;
+ }
+
+ public BlockDataStreamOutputEntry build() {
+ return new BlockDataStreamOutputEntry(blockID,
+ key,
+ xceiverClientManager,
+ pipeline,
+ length,
+ bufferPool,
+ token, config);
+ }
+ }
+
+ @VisibleForTesting
+ public ByteBufStreamOutput getByteBufStreamOutput() {
+ return byteBufStreamOutput;
+ }
+
+ public BlockID getBlockID() {
+ return blockID;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public XceiverClientFactory getXceiverClientManager() {
+ return xceiverClientManager;
+ }
+
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public long getCurrentPosition() {
+ return currentPosition;
+ }
+
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
+ public void setCurrentPosition(long curPosition) {
+ this.currentPosition = curPosition;
+ }
+}
+
+
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
new file mode 100644
index 0000000000..94c505f2af
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -0,0 +1,324 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * This class manages the stream entries list and handles block allocation
+ * from OzoneManager.
+ */
+public class BlockDataStreamOutputEntryPool {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockDataStreamOutputEntryPool.class);
+
+ private final List<BlockDataStreamOutputEntry> streamEntries;
+ private final OzoneClientConfig config;
+ private int currentStreamIndex;
+ private final OzoneManagerProtocol omClient;
+ private final OmKeyArgs keyArgs;
+ private final XceiverClientFactory xceiverClientFactory;
+ private final String requestID;
+ private final BufferPool bufferPool;
+ private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+ private final long openID;
+ private final ExcludeList excludeList;
+
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
+ public BlockDataStreamOutputEntryPool(
+ OzoneClientConfig config,
+ OzoneManagerProtocol omClient,
+ String requestId, ReplicationConfig replicationConfig,
+ String uploadID, int partNumber,
+ boolean isMultipart, OmKeyInfo info,
+ boolean unsafeByteBufferConversion,
+ XceiverClientFactory xceiverClientFactory, long openID
+ ) {
+ this.config = config;
+ this.xceiverClientFactory = xceiverClientFactory;
+ streamEntries = new ArrayList<>();
+ currentStreamIndex = 0;
+ this.omClient = omClient;
+ this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+ .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+ .setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
+ .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
+ .setMultipartUploadPartNumber(partNumber).build();
+ this.requestID = requestId;
+ this.openID = openID;
+ this.excludeList = new ExcludeList();
+
+ this.bufferPool =
+ new BufferPool(config.getStreamBufferSize(),
+ (int) (config.getStreamBufferMaxSize() / config
+ .getStreamBufferSize()),
+ ByteStringConversion
+ .createByteBufferConversion(unsafeByteBufferConversion));
+ }
+
+ /**
+ * A constructor for testing purpose only.
+ *
+ * @see KeyDataStreamOutput#KeyDataStreamOutput()
+ */
+ @VisibleForTesting
+ BlockDataStreamOutputEntryPool() {
+ streamEntries = new ArrayList<>();
+ omClient = null;
+ keyArgs = null;
+ xceiverClientFactory = null;
+ config =
+ new OzoneConfiguration().getObject(OzoneClientConfig.class);
+ config.setStreamBufferSize(0);
+ config.setStreamBufferMaxSize(0);
+ config.setStreamBufferFlushSize(0);
+ config.setStreamBufferFlushDelay(false);
+ requestID = null;
+ int chunkSize = 0;
+ bufferPool = new BufferPool(chunkSize, 1);
+
+ currentStreamIndex = 0;
+ openID = -1;
+ excludeList = new ExcludeList();
+ }
+
+ /**
+ * When a key is opened, it is possible that there are some blocks already
+ * allocated to it for this open session. In this case, to make use of these
+ * blocks, we need to add these blocks to stream entries. But, a key's version
+ * also includes blocks from previous versions, we need to avoid adding these
+ * old blocks to stream entries, because these old blocks should not be picked
+ * for write. To do this, the following method checks that, only those
+ * blocks created in this particular open version are added to stream entries.
+ *
+ * @param version the set of blocks that are pre-allocated.
+ * @param openVersion the version corresponding to the pre-allocation.
+ * @throws IOException
+ */
+ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+ long openVersion) throws IOException {
+ // server may return any number of blocks, (0 to any)
+ // only the blocks allocated in this open session (block createVersion
+ // equals to open session version)
+ for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) {
+ addKeyLocationInfo(subKeyInfo);
+ }
+ }
+
+ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ Preconditions.checkNotNull(subKeyInfo.getPipeline());
+ BlockDataStreamOutputEntry.Builder builder =
+ new BlockDataStreamOutputEntry.Builder()
+ .setBlockID(subKeyInfo.getBlockID())
+ .setKey(keyArgs.getKeyName())
+ .setXceiverClientManager(xceiverClientFactory)
+ .setPipeline(subKeyInfo.getPipeline())
+ .setConfig(config)
+ .setLength(subKeyInfo.getLength())
+ .setBufferPool(bufferPool)
+ .setToken(subKeyInfo.getToken());
+ streamEntries.add(builder.build());
+ }
+
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+ for (BlockDataStreamOutputEntry streamEntry : streamEntries) {
+ long length = streamEntry.getCurrentPosition();
+
+ // Commit only those blocks to OzoneManager which are not empty
+ if (length != 0) {
+ OmKeyLocationInfo info =
+ new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
+ .setLength(streamEntry.getCurrentPosition()).setOffset(0)
+ .setToken(streamEntry.getToken())
+ .setPipeline(streamEntry.getPipeline()).build();
+ locationInfoList.add(info);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "block written " + streamEntry.getBlockID() + ", length " + length
+ + " bcsID " + streamEntry.getBlockID()
+ .getBlockCommitSequenceId());
+ }
+ }
+ return locationInfoList;
+ }
+
+ /**
+ * Discards the subsequent pre allocated blocks and removes the streamEntries
+ * from the streamEntries list for the container which is closed.
+ * @param containerID id of the closed container
+ * @param pipelineId id of the associated pipeline
+ */
+ void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) {
+ // currentStreamIndex < streamEntries.size() signifies that, there are still
+ // pre allocated blocks available.
+
+ // This will be called only to discard the next subsequent unused blocks
+ // in the streamEntryList.
+ if (currentStreamIndex + 1 < streamEntries.size()) {
+ ListIterator<BlockDataStreamOutputEntry> streamEntryIterator =
+ streamEntries.listIterator(currentStreamIndex + 1);
+ while (streamEntryIterator.hasNext()) {
+ BlockDataStreamOutputEntry streamEntry = streamEntryIterator.next();
+ Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
+ if ((streamEntry.getPipeline().getId().equals(pipelineId)) ||
+ (containerID != -1 &&
+ streamEntry.getBlockID().getContainerID() == containerID)) {
+ streamEntryIterator.remove();
+ }
+ }
+ }
+ }
+
+ List<BlockDataStreamOutputEntry> getStreamEntries() {
+ return streamEntries;
+ }
+
+ XceiverClientFactory getXceiverClientFactory() {
+ return xceiverClientFactory;
+ }
+
+ String getKeyName() {
+ return keyArgs.getKeyName();
+ }
+
+ long getKeyLength() {
+ return streamEntries.stream().mapToLong(
+ BlockDataStreamOutputEntry::getCurrentPosition).sum();
+ }
+ /**
+ * Contact OM to get a new block. Set the new block with the index (e.g.
+ * first block has index = 0, second has index = 1 etc.)
+ *
+ * The returned block is made to new BlockDataStreamOutputEntry to write.
+ *
+ * @throws IOException
+ */
+ private void allocateNewBlock() throws IOException {
+ if (!excludeList.isEmpty()) {
+ LOG.debug("Allocating block with {}", excludeList);
+ }
+ OmKeyLocationInfo subKeyInfo =
+ omClient.allocateBlock(keyArgs, openID, excludeList);
+ addKeyLocationInfo(subKeyInfo);
+ }
+
+
+ void commitKey(long offset) throws IOException {
+ if (keyArgs != null) {
+ // in test, this could be null
+ long length = getKeyLength();
+ Preconditions.checkArgument(offset == length);
+ keyArgs.setDataSize(length);
+ keyArgs.setLocationInfoList(getLocationInfoList());
+ // When the key is multipart upload part file upload, we should not
+ // commit the key, as this is not an actual key, this is a just a
+ // partial key of a large file.
+ if (keyArgs.getIsMultipartKey()) {
+ commitUploadPartInfo =
+ omClient.commitMultipartUploadPart(keyArgs, openID);
+ } else {
+ omClient.commitKey(keyArgs, openID);
+ }
+ } else {
+ LOG.warn("Closing KeyDataStreamOutput, but key args is null");
+ }
+ }
+
+ public BlockDataStreamOutputEntry getCurrentStreamEntry() {
+ if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
+ return null;
+ } else {
+ return streamEntries.get(currentStreamIndex);
+ }
+ }
+
+ BlockDataStreamOutputEntry allocateBlockIfNeeded() throws IOException {
+ BlockDataStreamOutputEntry streamEntry = getCurrentStreamEntry();
+ if (streamEntry != null && streamEntry.isClosed()) {
+ // a stream entry gets closed either by :
+ // a. If the stream gets full
+ // b. it has encountered an exception
+ currentStreamIndex++;
+ }
+ if (streamEntries.size() <= currentStreamIndex) {
+ Preconditions.checkNotNull(omClient);
+ // allocate a new block, if a exception happens, log an error and
+ // throw exception to the caller directly, and the write fails.
+ allocateNewBlock();
+ }
+ // in theory, this condition should never violate due the check above
+ // still do a sanity check.
+ Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+ return streamEntries.get(currentStreamIndex);
+ }
+
+ long computeBufferData() {
+ return bufferPool.computeBufferData();
+ }
+
+ void cleanup() {
+ if (excludeList != null) {
+ excludeList.clear();
+ }
+ if (bufferPool != null) {
+ bufferPool.clearBufferPool();
+ }
+
+ if (streamEntries != null) {
+ streamEntries.clear();
+ }
+ }
+
+ public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ return commitUploadPartInfo;
+ }
+
+ public ExcludeList getExcludeList() {
+ return excludeList;
+ }
+
+ boolean isEmpty() {
+ return streamEntries.isEmpty();
+ }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
new file mode 100644
index 0000000000..a9be11667c
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Maintaining a list of BlockInputStream. Write based on offset.
+ *
+ * Note that this may write to multiple containers in one write call. In case
+ * that first container succeeded but later ones failed, the succeeded writes
+ * are not rolled back.
+ *
+ * TODO : currently not support multi-thread access.
+ */
+public class KeyDataStreamOutput implements ByteBufStreamOutput {
+
+ private OzoneClientConfig config;
+
+ /**
+ * Defines stream action while calling handleFlushOrClose.
+ */
+ enum StreamAction {
+ FLUSH, CLOSE, FULL
+ }
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(KeyDataStreamOutput.class);
+
+ private boolean closed;
+ private FileEncryptionInfo feInfo;
+ private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+ private int retryCount;
+ // how much of data is actually written yet to underlying stream
+ private long offset;
+ // how much data has been ingested into the stream
+ private long writeOffset;
+ // whether an exception is encountered while write and whole write could
+ // not succeed
+ private boolean isException;
+ private final BlockDataStreamOutputEntryPool blockDataStreamOutputEntryPool;
+
+ private long clientID;
+
+ /**
+ * A constructor for testing purpose only.
+ */
+ @VisibleForTesting
+ public KeyDataStreamOutput() {
+ closed = false;
+ this.retryPolicyMap = HddsClientUtils.getExceptionList()
+ .stream()
+ .collect(Collectors.toMap(Function.identity(),
+ e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
+ retryCount = 0;
+ offset = 0;
+ blockDataStreamOutputEntryPool = new BlockDataStreamOutputEntryPool();
+ }
+
+ @VisibleForTesting
+ public List<BlockDataStreamOutputEntry> getStreamEntries() {
+ return blockDataStreamOutputEntryPool.getStreamEntries();
+ }
+
+ @VisibleForTesting
+ public XceiverClientFactory getXceiverClientFactory() {
+ return blockDataStreamOutputEntryPool.getXceiverClientFactory();
+ }
+
+ @VisibleForTesting
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ return blockDataStreamOutputEntryPool.getLocationInfoList();
+ }
+
+ @VisibleForTesting
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ @VisibleForTesting
+ public long getClientID() {
+ return clientID;
+ }
+
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
+ public KeyDataStreamOutput(
+ OzoneClientConfig config,
+ OpenKeySession handler,
+ XceiverClientFactory xceiverClientManager,
+ OzoneManagerProtocol omClient, int chunkSize,
+ String requestId, ReplicationConfig replicationConfig,
+ String uploadID, int partNumber, boolean isMultipart,
+ boolean unsafeByteBufferConversion
+ ) {
+ this.config = config;
+ OmKeyInfo info = handler.getKeyInfo();
+ blockDataStreamOutputEntryPool =
+ new BlockDataStreamOutputEntryPool(
+ config,
+ omClient,
+ requestId, replicationConfig,
+ uploadID, partNumber,
+ isMultipart, info,
+ unsafeByteBufferConversion,
+ xceiverClientManager,
+ handler.getId());
+
+ // Retrieve the file encryption key info, null if file is not in
+ // encrypted bucket.
+ this.feInfo = info.getFileEncryptionInfo();
+ this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
+ config.getMaxRetryCount(), config.getRetryInterval());
+ this.retryCount = 0;
+ this.isException = false;
+ this.writeOffset = 0;
+ this.clientID = handler.getId();
+ }
+
+ /**
+ * When a key is opened, it is possible that there are some blocks already
+ * allocated to it for this open session. In this case, to make use of these
+ * blocks, we need to add these blocks to stream entries. But, a key's version
+ * also includes blocks from previous versions, we need to avoid adding these
+ * old blocks to stream entries, because these old blocks should not be picked
+ * for write. To do this, the following method checks that, only those
+ * blocks created in this particular open version are added to stream entries.
+ *
+ * @param version the set of blocks that are pre-allocated.
+ * @param openVersion the version corresponding to the pre-allocation.
+ * @throws IOException
+ */
+ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+ long openVersion) throws IOException {
+ blockDataStreamOutputEntryPool.addPreallocateBlocks(version, openVersion);
+ }
+
+ @Override
+ public void write(ByteBuf b) throws IOException {
+ checkNotClosed();
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ final int len = b.readableBytes();
+ handleWrite(b, b.readerIndex(), len, false);
+ writeOffset += len;
+ }
+
+ private void handleWrite(ByteBuf b, int off, long len, boolean retry)
+ throws IOException {
+ while (len > 0) {
+ try {
+ BlockDataStreamOutputEntry current =
+ blockDataStreamOutputEntryPool.allocateBlockIfNeeded();
+ // length(len) will be in int range if the call is happening through
+ // write API of blockDataStreamOutput. Length can be in long range
+ // if it comes via Exception path.
+ int expectedWriteLen = Math.min((int) len,
+ (int) current.getRemaining());
+ long currentPos = current.getWrittenDataLength();
+ // writeLen will be updated based on whether the write was succeeded
+ // or if it sees an exception, how much the actual write was
+ // acknowledged.
+ int writtenLength =
+ writeToDataStreamOutput(current, retry, len, b,
+ expectedWriteLen, off, currentPos);
+ if (current.getRemaining() <= 0) {
+ // since the current block is already written close the stream.
+ handleFlushOrClose(StreamAction.FULL);
+ }
+ len -= writtenLength;
+ off += writtenLength;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
+ boolean retry, long len, ByteBuf b, int writeLen, int off,
+ long currentPos) throws IOException {
+ try {
+ if (retry) {
+ current.writeOnRetry(len);
+ } else {
+ current.write(b, off, writeLen);
+ offset += writeLen;
+ }
+ } catch (IOException ioe) {
+ // for the current iteration, totalDataWritten - currentPos gives the
+ // amount of data already written to the buffer
+
+ // In the retryPath, the total data to be written will always be equal
+ // to or less than the max length of the buffer allocated.
+ // The len specified here is the combined sum of the data length of
+ // the buffers
+ Preconditions.checkState(!retry || len <= config
+ .getStreamBufferMaxSize());
+ int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+ writeLen = retry ? (int) len : dataWritten;
+ // In retry path, the data written is already accounted in offset.
+ if (!retry) {
+ offset += writeLen;
+ }
+ LOG.debug("writeLen {}, total len {}", writeLen, len);
+ handleException(current, ioe);
+ }
+ return writeLen;
+ }
+
+ /**
+ * It performs following actions :
+ * a. Updates the committed length at datanode for the current stream in
+ * datanode.
+ * b. Reads the data from the underlying buffer and writes it the next stream.
+ *
+ * @param streamEntry StreamEntry
+ * @param exception actual exception that occurred
+ * @throws IOException Throws IOException if Write fails
+ */
+ private void handleException(BlockDataStreamOutputEntry streamEntry,
+ IOException exception) throws IOException {
+ Throwable t = HddsClientUtils.checkForException(exception);
+ Preconditions.checkNotNull(t);
+ boolean retryFailure = checkForRetryFailure(t);
+ boolean containerExclusionException = false;
+ if (!retryFailure) {
+ containerExclusionException = checkIfContainerToExclude(t);
+ }
+ Pipeline pipeline = streamEntry.getPipeline();
+ PipelineID pipelineId = pipeline.getId();
+ long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+ //set the correct length for the current stream
+ streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
+ long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
+ if (containerExclusionException) {
+ LOG.debug(
+ "Encountered exception {}. The last committed block length is {}, "
+ + "uncommitted data length is {} retry count {}", exception,
+ totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+ } else {
+ LOG.warn(
+ "Encountered exception {} on the pipeline {}. "
+ + "The last committed block length is {}, "
+ + "uncommitted data length is {} retry count {}", exception,
+ pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+ }
+ Preconditions.checkArgument(
+ bufferedDataLen <= config.getStreamBufferMaxSize());
+ Preconditions.checkArgument(
+ offset - blockDataStreamOutputEntryPool.getKeyLength() ==
+ bufferedDataLen);
+ long containerId = streamEntry.getBlockID().getContainerID();
+ Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
+ Preconditions.checkNotNull(failedServers);
+ ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList();
+ if (!failedServers.isEmpty()) {
+ excludeList.addDatanodes(failedServers);
+ }
+
+ // if the container needs to be excluded , add the container to the
+ // exclusion list , otherwise add the pipeline to the exclusion list
+ if (containerExclusionException) {
+ excludeList.addConatinerId(ContainerID.valueOf(containerId));
+ } else {
+ excludeList.addPipeline(pipelineId);
+ }
+ // just clean up the current stream.
+ streamEntry.cleanup(retryFailure);
+
+ // discard all subsequent blocks the containers and pipelines which
+ // are in the exclude list so that, the very next retry should never
+ // write data on the closed container/pipeline
+ if (containerExclusionException) {
+ // discard subsequent pre allocated blocks from the streamEntries list
+ // from the closed container
+ blockDataStreamOutputEntryPool
+ .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
+ null);
+ } else {
+ // In case there is timeoutException or Watch for commit happening over
+ // majority or the client connection failure to the leader in the
+ // pipeline, just discard all the pre allocated blocks on this pipeline.
+ // Next block allocation will happen with excluding this specific pipeline
+ // This will ensure if 2 way commit happens , it cannot span over multiple
+ // blocks
+ blockDataStreamOutputEntryPool
+ .discardPreallocatedBlocks(-1, pipelineId);
+ }
+ if (bufferedDataLen > 0) {
+ // If the data is still cached in the underlying stream, we need to
+ // allocate new block and write this data in the datanode.
+ handleRetry(exception, bufferedDataLen);
+ // reset the retryCount after handling the exception
+ retryCount = 0;
+ }
+ }
+
+ private void markStreamClosed() {
+ blockDataStreamOutputEntryPool.cleanup();
+ closed = true;
+ }
+
+ private void handleRetry(IOException exception, long len) throws IOException {
+ RetryPolicy retryPolicy = retryPolicyMap
+ .get(HddsClientUtils.checkForException(exception).getClass());
+ if (retryPolicy == null) {
+ retryPolicy = retryPolicyMap.get(Exception.class);
+ }
+ RetryPolicy.RetryAction action = null;
+ try {
+ action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
+ } catch (Exception e) {
+ setExceptionAndThrow(new IOException(e));
+ }
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ String msg = "";
+ if (action.reason != null) {
+ msg = "Retry request failed. " + action.reason;
+ LOG.error(msg, exception);
+ }
+ setExceptionAndThrow(new IOException(msg, exception));
+ }
+
+ // Throw the exception if the thread is interrupted
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Interrupted while trying for retry");
+ setExceptionAndThrow(exception);
+ }
+ Preconditions.checkArgument(
+ action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+ if (action.delayMillis > 0) {
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ IOException ioe = (IOException) new InterruptedIOException(
+ "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+ .initCause(e);
+ setExceptionAndThrow(ioe);
+ }
+ }
+ retryCount++;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Retrying Write request. Already tried {} time(s); " +
+ "retry policy is {} ", retryCount, retryPolicy);
+ }
+ handleWrite(null, 0, len, true);
+ }
+
+ private void setExceptionAndThrow(IOException ioe) throws IOException {
+ isException = true;
+ throw ioe;
+ }
+
+ /**
+ * Checks if the provided exception signifies retry failure in ratis client.
+ * In case of retry failure, ratis client throws RaftRetryFailureException
+ * and all succeeding operations are failed with AlreadyClosedException.
+ */
+ private boolean checkForRetryFailure(Throwable t) {
+ return t instanceof RaftRetryFailureException
+ || t instanceof AlreadyClosedException;
+ }
+
+ // Every container specific exception from datatnode will be seen as
+ // StorageContainerException
+ private boolean checkIfContainerToExclude(Throwable t) {
+ return t instanceof StorageContainerException;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkNotClosed();
+ handleFlushOrClose(StreamAction.FLUSH);
+ }
+
+ /**
+ * Close or Flush the latest outputStream depending upon the action.
+ * This function gets called when while write is going on, the current stream
+ * gets full or explicit flush or close request is made by client. when the
+ * stream gets full and we try to close the stream , we might end up hitting
+ * an exception in the exception handling path, we write the data residing in
+ * in the buffer pool to a new Block. In cases, as such, when the data gets
+ * written to new stream , it will be at max half full. In such cases, we
+ * should just write the data and not close the stream as the block won't be
+ * completely full.
+ *
+ * @param op Flag which decides whether to call close or flush on the
+ * outputStream.
+ * @throws IOException In case, flush or close fails with exception.
+ */
+ @SuppressWarnings("squid:S1141")
+ private void handleFlushOrClose(StreamAction op) throws IOException {
+ if (!blockDataStreamOutputEntryPool.isEmpty()) {
+ while (true) {
+ try {
+ BlockDataStreamOutputEntry entry =
+ blockDataStreamOutputEntryPool.getCurrentStreamEntry();
+ if (entry != null) {
+ try {
+ handleStreamAction(entry, op);
+ } catch (IOException ioe) {
+ handleException(entry, ioe);
+ continue;
+ }
+ }
+ return;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw e;
+ }
+ }
+ }
+ }
+
+ private void handleStreamAction(BlockDataStreamOutputEntry entry,
+ StreamAction op) throws IOException {
+ Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+ // failed servers can be null in case there is no data written in
+ // the stream
+ if (!failedServers.isEmpty()) {
+ blockDataStreamOutputEntryPool.getExcludeList().addDatanodes(
+ failedServers);
+ }
+ switch (op) {
+ case CLOSE:
+ entry.close();
+ break;
+ case FULL:
+ if (entry.getRemaining() == 0) {
+ entry.close();
+ }
+ break;
+ case FLUSH:
+ entry.flush();
+ break;
+ default:
+ throw new IOException("Invalid Operation");
+ }
+ }
+
+ /**
+ * Commit the key to OM, this will add the blocks as the new key blocks.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ handleFlushOrClose(StreamAction.CLOSE);
+ if (!isException) {
+ Preconditions.checkArgument(writeOffset == offset);
+ }
+ blockDataStreamOutputEntryPool.commitKey(offset);
+ } finally {
+ blockDataStreamOutputEntryPool.cleanup();
+ }
+ }
+
+ public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ return blockDataStreamOutputEntryPool.getCommitUploadPartInfo();
+ }
+
+ public FileEncryptionInfo getFileEncryptionInfo() {
+ return feInfo;
+ }
+
+ @VisibleForTesting
+ public ExcludeList getExcludeList() {
+ return blockDataStreamOutputEntryPool.getExcludeList();
+ }
+
+ /**
+ * Builder class of KeyDataStreamOutput.
+ */
+ public static class Builder {
+ private OpenKeySession openHandler;
+ private XceiverClientFactory xceiverManager;
+ private OzoneManagerProtocol omClient;
+ private int chunkSize;
+ private String requestID;
+ private String multipartUploadID;
+ private int multipartNumber;
+ private boolean isMultipartKey;
+ private boolean unsafeByteBufferConversion;
+ private OzoneClientConfig clientConfig;
+ private ReplicationConfig replicationConfig;
+
+ public Builder setMultipartUploadID(String uploadID) {
+ this.multipartUploadID = uploadID;
+ return this;
+ }
+
+ public Builder setMultipartNumber(int partNumber) {
+ this.multipartNumber = partNumber;
+ return this;
+ }
+
+ public Builder setHandler(OpenKeySession handler) {
+ this.openHandler = handler;
+ return this;
+ }
+
+ public Builder setXceiverClientManager(XceiverClientFactory manager) {
+ this.xceiverManager = manager;
+ return this;
+ }
+
+ public Builder setOmClient(OzoneManagerProtocol client) {
+ this.omClient = client;
+ return this;
+ }
+
+ public Builder setChunkSize(int size) {
+ this.chunkSize = size;
+ return this;
+ }
+
+ public Builder setRequestID(String id) {
+ this.requestID = id;
+ return this;
+ }
+
+ public Builder setIsMultipartKey(boolean isMultipart) {
+ this.isMultipartKey = isMultipart;
+ return this;
+ }
+
+ public Builder setConfig(OzoneClientConfig config) {
+ this.clientConfig = config;
+ return this;
+ }
+
+ public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+ this.unsafeByteBufferConversion = enabled;
+ return this;
+ }
+
+
+ public Builder setReplicationConfig(ReplicationConfig replConfig) {
+ this.replicationConfig = replConfig;
+ return this;
+ }
+
+ public KeyDataStreamOutput build() {
+ return new KeyDataStreamOutput(
+ clientConfig,
+ openHandler,
+ xceiverManager,
+ omClient,
+ chunkSize,
+ requestID,
+ replicationConfig,
+ multipartUploadID,
+ multipartNumber,
+ isMultipartKey,
+ unsafeByteBufferConversion);
+ }
+
+ }
+
+ /**
+ * Verify that the output stream is open. Non blocking; this gives
+ * the last state of the volatile {@link #closed} field.
+ * @throws IOException if the connection is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException(
+ ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ + blockDataStreamOutputEntryPool.getKeyName());
+ }
+ }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
new file mode 100644
index 0000000000..378b86872e
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+
+import java.io.IOException;
+
+/**
+ * OzoneDataStreamOutput is used to write data into Ozone.
+ * It uses SCM's {@link KeyDataStreamOutput} for writing the data.
+ */
+public class OzoneDataStreamOutput implements ByteBufStreamOutput {
+
+ private final ByteBufStreamOutput byteBufStreamOutput;
+
+ /**
+ * Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
+ *
+ * @param byteBufStreamOutput
+ */
+ public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) {
+ this.byteBufStreamOutput = byteBufStreamOutput;
+ }
+
+ @Override
+ public void write(ByteBuf b) throws IOException {
+ byteBufStreamOutput.write(b);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ byteBufStreamOutput.flush();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ //commitKey can be done here, if needed.
+ byteBufStreamOutput.close();
+ }
+
+ public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ if (byteBufStreamOutput instanceof KeyDataStreamOutput) {
+ return ((KeyDataStreamOutput)
+ byteBufStreamOutput).getCommitUploadPartInfo();
+ }
+ // Otherwise return null.
+ return null;
+ }
+
+ public ByteBufStreamOutput getByteBufStreamOutput() {
+ return byteBufStreamOutput;
+ }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 2f6e666fd0..f15934f01e 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.TenantArgs;
import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -331,6 +332,20 @@ public interface ClientProtocol {
Map<String, String> metadata)
throws IOException;
+ /**
+ * Writes a key in an existing bucket.
+ * @param volumeName Name of the Volume
+ * @param bucketName Name of the Bucket
+ * @param keyName Name of the Key
+ * @param size Size of the data
+ * @param metadata custom key value metadata
+ * @return {@link OzoneDataStreamOutput}
+ *
+ */
+ OzoneDataStreamOutput createStreamKey(String volumeName, String bucketName,
+ String keyName, long size, ReplicationConfig replicationConfig,
+ Map<String, String> metadata)
+ throws IOException;
/**
* Reads a key from an existing bucket.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 42aeba310d..0d7bef5a8b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -94,11 +94,13 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.client.io.MultipartCryptoKeyInputStream;
import org.apache.hadoop.ozone.client.io.OzoneCryptoInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -1213,6 +1215,48 @@ public class RpcClient implements ClientProtocol {
return createOutputStream(openKey, requestId);
}
+ @Override
+ public OzoneDataStreamOutput createStreamKey(
+ String volumeName, String bucketName, String keyName, long size,
+ ReplicationConfig replicationConfig,
+ Map<String, String> metadata)
+ throws IOException {
+ verifyVolumeName(volumeName);
+ verifyBucketName(bucketName);
+ if (checkKeyNameEnabled) {
+ HddsClientUtils.verifyKeyName(keyName);
+ }
+ HddsClientUtils.checkNotNull(keyName, replicationConfig);
+ String requestId = UUID.randomUUID().toString();
+
+ OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(size)
+ .setReplicationConfig(replicationConfig)
+ .addAllMetadata(metadata)
+ .setAcls(getAclList());
+
+ if (Boolean.parseBoolean(metadata.get(OzoneConsts.GDPR_FLAG))) {
+ try{
+ GDPRSymmetricKey gKey = new GDPRSymmetricKey(new SecureRandom());
+ builder.addAllMetadata(gKey.getKeyDetails());
+ } catch (Exception e) {
+ if (e instanceof InvalidKeyException &&
+ e.getMessage().contains("Illegal key size or default parameters")) {
+ LOG.error("Missing Unlimited Strength Policy jars. Please install " +
+ "Java Cryptography Extension (JCE) Unlimited Strength " +
+ "Jurisdiction Policy Files");
+ }
+ throw new IOException(e);
+ }
+ }
+
+ OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+ return createDataStreamOutput(openKey, requestId, replicationConfig);
+ }
+
private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
throws IOException {
// check crypto protocol version
@@ -1964,6 +2008,24 @@ public class RpcClient implements ClientProtocol {
cryptoInputStreams);
}
}
+ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey,
+ String requestId, ReplicationConfig replicationConfig)
+ throws IOException {
+ KeyDataStreamOutput keyOutputStream =
+ new KeyDataStreamOutput.Builder()
+ .setHandler(openKey)
+ .setXceiverClientManager(xceiverClientManager)
+ .setOmClient(ozoneManagerClient)
+ .setRequestID(requestId)
+ .setReplicationConfig(replicationConfig)
+ .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig)
+ .build();
+ keyOutputStream
+ .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
+ openKey.getOpenVersion());
+ return new OzoneDataStreamOutput(keyOutputStream);
+ }
private OzoneOutputStream createOutputStream(OpenKeySession openKey,
String requestId) throws IOException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
new file mode 100644
index 0000000000..4d52d89490
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import io.netty.buffer.Unpooled;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests BlockDataStreamOutput class.
+ */
+public class TestBlockDataStreamOutput {
+
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = Timeout.seconds(300);
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ chunkSize = 100;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumType(ChecksumType.NONE);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testblockoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ private String getKeyName() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testMultiChunkWrite() throws Exception {
+ // write data less than 1 chunk size use streaming.
+ String keyName1 = getKeyName();
+ OzoneDataStreamOutput key1 = createKey(
+ keyName1, ReplicationType.RATIS, 0);
+ int dataLength1 = chunkSize/2;
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength1)
+ .getBytes(UTF_8);
+ key1.write(Unpooled.copiedBuffer(data1));
+ // now close the stream, It will update the key length.
+ key1.close();
+ validateData(keyName1, data1);
+
+ // write data more than 1 chunk size use streaming.
+ String keyName2 = getKeyName();
+ OzoneDataStreamOutput key2 = createKey(
+ keyName2, ReplicationType.RATIS, 0);
+ int dataLength2 = chunkSize + 50;
+ byte[] data2 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength2)
+ .getBytes(UTF_8);
+ key2.write(Unpooled.copiedBuffer(data2));
+ // now close the stream, It will update the key length.
+ key2.close();
+ validateData(keyName2, data2);
+
+ // write data more than 1 block size use streaming.
+ String keyName3 = getKeyName();
+ OzoneDataStreamOutput key3 = createKey(
+ keyName3, ReplicationType.RATIS, 0);
+ int dataLength3 = blockSize + 50;
+ byte[] data3 =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength3)
+ .getBytes(UTF_8);
+ key3.write(Unpooled.copiedBuffer(data3));
+ // now close the stream, It will update the key length.
+ key3.close();
+ validateData(keyName3, data3);
+ }
+
+ private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ return TestHelper.createStreamKey(
+ keyName, type, size, objectStore, volumeName, bucketName);
+ }
+ private void validateData(String keyName, byte[] data) throws Exception {
+ TestHelper
+ .validateData(keyName, data, objectStore, volumeName, bucketName);
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index dae6e383f8..cf3a51241e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -134,8 +135,23 @@ public final class TestHelper {
}
org.apache.hadoop.hdds.client.ReplicationFactor factor =
org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+ ReplicationConfig config =
+ ReplicationConfig.fromTypeAndFactor(type, factor);
return objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey(keyName, size, type, factor, new HashMap<>());
+ .createKey(keyName, size, config, new HashMap<>());
+ }
+
+ public static OzoneDataStreamOutput createStreamKey(String keyName,
+ ReplicationType type, long size, ObjectStore objectStore,
+ String volumeName, String bucketName) throws Exception {
+ org.apache.hadoop.hdds.client.ReplicationFactor factor =
+ type == ReplicationType.STAND_ALONE ?
+ org.apache.hadoop.hdds.client.ReplicationFactor.ONE :
+ org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+ ReplicationConfig config =
+ ReplicationConfig.fromTypeAndFactor(type, factor);
+ return objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createStreamKey(keyName, size, config, new HashMap<>());
}
public static OzoneOutputStream createKey(String keyName,
@@ -143,8 +159,10 @@ public final class TestHelper {
org.apache.hadoop.hdds.client.ReplicationFactor factor, long size,
ObjectStore objectStore, String volumeName, String bucketName)
throws Exception {
+ ReplicationConfig config =
+ ReplicationConfig.fromTypeAndFactor(type, factor);
return objectStore.getVolume(volumeName).getBucket(bucketName)
- .createKey(keyName, size, type, factor, new HashMap<>());
+ .createKey(keyName, size, config, new HashMap<>());
}
public static OzoneOutputStream createKey(String keyName,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 7d7885d168..e4b842eaca 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -23,9 +23,14 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.io.IOUtils;
@@ -34,6 +39,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientException;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.shell.OzoneAddress;
import org.apache.commons.codec.digest.DigestUtils;
@@ -89,10 +95,36 @@ public class PutKeyHandler extends KeyHandler {
int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
- try (InputStream input = new FileInputStream(dataFile);
- OutputStream output = bucket.createKey(keyName, dataFile.length(),
- replicationConfig, keyMetadata)) {
- IOUtils.copyBytes(input, output, chunkSize);
+
+ if (dataFile.length() <= chunkSize) {
+ if (isVerbose()) {
+ out().println("API: async");
+ }
+ try (InputStream input = new FileInputStream(dataFile);
+ OutputStream output = bucket.createKey(keyName, dataFile.length(),
+ replicationConfig, keyMetadata)) {
+ IOUtils.copyBytes(input, output, chunkSize);
+ }
+ } else {
+ if (isVerbose()) {
+ out().println("API: streaming");
+ }
+ try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r");
+ OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
+ dataFile.length(), replicationConfig, keyMetadata)) {
+ FileChannel ch = raf.getChannel();
+ long len = raf.length();
+ long off = 0;
+ while (len > 0) {
+ long writeLen = Math.min(len, chunkSize);
+ ByteBuffer segment =
+ ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+ ByteBuf buf = Unpooled.wrappedBuffer(segment);
+ out.write(buf);
+ off += writeLen;
+ len -= writeLen;
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org