You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by pi...@apache.org on 2021/10/06 16:35:52 UTC

[ozone] branch master updated: HDDS-5816 Rearrange code and refactor some logic into new methods in prep for EC addition. (#2709)

This is an automated email from the ASF dual-hosted git repository.

pifta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ea81ef8  HDDS-5816 Rearrange code and refactor some logic into new methods in prep for EC addition. (#2709)
ea81ef8 is described below

commit ea81ef820827cdfca188c3881538d65f7359d375
Author: Istvan Fajth <pi...@cloudera.com>
AuthorDate: Wed Oct 6 18:35:29 2021 +0200

    HDDS-5816 Rearrange code and refactor some logic into new methods in prep for EC addition. (#2709)
---
 .../ozone/client/io/BlockOutputStreamEntry.java    | 268 ++++++++++++++-------
 .../client/io/BlockOutputStreamEntryPool.java      | 109 +++++++--
 .../hadoop/ozone/client/io/KeyOutputStream.java    |   3 +-
 3 files changed, 275 insertions(+), 105 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 548587c..45e8473 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -36,9 +36,14 @@ import org.apache.hadoop.security.token.Token;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Helper class used inside {@link BlockOutputStream}.
+ * A BlockOutputStreamEntry manages the data writes into the DataNodes.
+ * It wraps BlockOutputStreams that are connecting to the DataNodes,
+ * and in the meantime accounts the length of data successfully written.
+ *
+ * The base implementation is handling Ratis-3 writes, with a single stream,
+ * but there can be other implementations that are using a different way.
  * */
-public final class BlockOutputStreamEntry extends OutputStream {
+public class BlockOutputStreamEntry extends OutputStream {
 
   private final OzoneClientConfig config;
   private OutputStream outputStream;
@@ -55,7 +60,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private BufferPool bufferPool;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
-  private BlockOutputStreamEntry(
+  BlockOutputStreamEntry(
       BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
@@ -76,74 +81,93 @@ public final class BlockOutputStreamEntry extends OutputStream {
     this.bufferPool = bufferPool;
   }
 
-  long getLength() {
-    return length;
-  }
-
-  Token<OzoneBlockTokenIdentifier> getToken() {
-    return token;
-  }
-
-  long getRemaining() {
-    return length - currentPosition;
-  }
-
   /**
    * BlockOutputStream is initialized in this function. This makes sure that
    * xceiverClient initialization is not done during preallocation and only
    * done when data is written.
    * @throws IOException if xceiverClient initialization fails
    */
-  private void checkStream() throws IOException {
-    if (this.outputStream == null) {
-      this.outputStream =
-          new RatisBlockOutputStream(blockID, xceiverClientManager,
-              pipeline, bufferPool, config, token);
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      createOutputStream();
     }
   }
 
+  /**
+   * Creates the outputStreams that are necessary to start the write.
+   * Implementors can override this to instantiate multiple streams instead.
+   * @throws IOException
+   */
+  void createOutputStream() throws IOException {
+    outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
+        pipeline, bufferPool, config, token);
+  }
 
   @Override
   public void write(int b) throws IOException {
     checkStream();
-    outputStream.write(b);
-    this.currentPosition += 1;
+    getOutputStream().write(b);
+    incCurrentPosition();
   }
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
     checkStream();
-    outputStream.write(b, off, len);
-    this.currentPosition += len;
+    getOutputStream().write(b, off, len);
+    incCurrentPosition(len);
+  }
+
+  void writeOnRetry(long len) throws IOException {
+    checkStream();
+    BlockOutputStream out = (BlockOutputStream) getOutputStream();
+    out.writeOnRetry(len);
+    incCurrentPosition(len);
   }
 
   @Override
   public void flush() throws IOException {
-    if (this.outputStream != null) {
-      this.outputStream.flush();
+    if (isInitialized()) {
+      getOutputStream().flush();
     }
   }
 
   @Override
   public void close() throws IOException {
-    if (this.outputStream != null) {
-      this.outputStream.close();
+    if (isInitialized()) {
+      getOutputStream().close();
       // after closing the chunkOutPutStream, blockId would have been
       // reconstructed with updated bcsId
-      this.blockID = ((BlockOutputStream) outputStream).getBlockID();
+      this.blockID = ((BlockOutputStream) getOutputStream()).getBlockID();
     }
   }
 
   boolean isClosed() {
-    if (outputStream != null) {
-      return  ((BlockOutputStream) outputStream).isClosed();
+    if (isInitialized()) {
+      return  ((BlockOutputStream) getOutputStream()).isClosed();
     }
     return false;
   }
 
+  void cleanup(boolean invalidateClient) throws IOException {
+    checkStream();
+    BlockOutputStream out = (BlockOutputStream) getOutputStream();
+    out.cleanup(invalidateClient);
+  }
+
+  /**
+   * If the underlying BlockOutputStream implements acknowledgement of the
+   * writes, this method returns the total number of bytes acknowledged to be
+   * stored by the DataNode peers.
+   * The default stream implementation returns zero, and if the used stream
+   * does not implement acknowledgement, this method returns zero.
+   *
+   * @return the number of bytes confirmed to by acknowledge by the underlying
+   *    BlockOutputStream, or zero if acknowledgment logic is not implemented,
+   *    or the entry is not initialized.
+   */
   long getTotalAckDataLength() {
-    if (outputStream != null) {
-      BlockOutputStream out = (BlockOutputStream) this.outputStream;
+    if (isInitialized()) {
+      BlockOutputStream out = (BlockOutputStream) getOutputStream();
       blockID = out.getBlockID();
       return out.getTotalAckDataLength();
     } else {
@@ -154,17 +178,13 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
-  Collection<DatanodeDetails> getFailedServers() {
-    if (outputStream != null) {
-      BlockOutputStream out = (BlockOutputStream) this.outputStream;
-      return out.getFailedServers();
-    }
-    return Collections.emptyList();
-  }
-
+  /**
+   * Returns the amount of bytes that were attempted to be sent through towards
+   * the DataNodes, and the write call succeeded without an exception.
+   */
   long getWrittenDataLength() {
-    if (outputStream != null) {
-      BlockOutputStream out = (BlockOutputStream) this.outputStream;
+    if (isInitialized()) {
+      BlockOutputStream out = (BlockOutputStream) getOutputStream();
       return out.getWrittenDataLength();
     } else {
       // For a pre allocated block for which no write has been initiated,
@@ -174,19 +194,127 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
-  void cleanup(boolean invalidateClient) throws IOException {
-    checkStream();
-    BlockOutputStream out = (BlockOutputStream) this.outputStream;
-    out.cleanup(invalidateClient);
+  Collection<DatanodeDetails> getFailedServers() {
+    if (isInitialized()) {
+      BlockOutputStream out = (BlockOutputStream) getOutputStream();
+      return out.getFailedServers();
+    }
+    return Collections.emptyList();
+  }
 
+  /**
+   * Used to decide if the wrapped output stream is created already or not.
+   * @return true if the wrapped stream is already initialized.
+   */
+  boolean isInitialized() {
+    return getOutputStream() != null;
   }
 
-  void writeOnRetry(long len) throws IOException {
-    checkStream();
-    BlockOutputStream out = (BlockOutputStream) this.outputStream;
-    out.writeOnRetry(len);
-    this.currentPosition += len;
+  /**
+   * Gets the intended length of the key to be written.
+   * @return the length to be written into the key.
+   */
+  //TODO: this does not belong to here...
+  long getLength() {
+    return this.length;
+  }
+
+  /**
+   * Gets the block token that is used to authenticate during the write.
+   * @return the block token for writing the data
+   */
+  Token<OzoneBlockTokenIdentifier> getToken() {
+    return this.token;
+  }
+
+  /**
+   * Gets the amount of bytes remaining from the full write.
+   * @return the amount of bytes to still be written to the key
+   */
+  //TODO: this does not belong to here...
+  long getRemaining() {
+    return getLength() - getCurrentPosition();
+  }
 
+  /**
+   * Increases current position by the given length. Used in writes.
+   *
+   * @param len the amount of bytes to increase position with.
+   */
+  void incCurrentPosition(long len) {
+    currentPosition += len;
+  }
+
+  /**
+   * Increases current position by one. Used in writes.
+   */
+  void incCurrentPosition(){
+    currentPosition++;
+  }
+
+  /**
+   * In case of a failure this method can be used to reset the position back to
+   * the last position acked by a node before a write failure.
+   */
+  void resetToAckedPosition() {
+    currentPosition = getTotalAckDataLength();
+  }
+
+  @VisibleForTesting
+  public OutputStream getOutputStream() {
+    return this.outputStream;
+  }
+
+  @VisibleForTesting
+  public BlockID getBlockID() {
+    return this.blockID;
+  }
+
+  /**
+   * During writes a block ID might change as BCSID's are increasing.
+   * Implementors might account these changes, and return a different block id
+   * here.
+   * @param id the last know ID of the block.
+   */
+  void updateBlockID(BlockID id) {
+    this.blockID = id;
+  }
+
+  OzoneClientConfig getConf(){
+    return this.config;
+  }
+
+  XceiverClientFactory getXceiverClientManager() {
+    return this.xceiverClientManager;
+  }
+
+  /**
+   * Gets the original Pipeline this entry is initialized with.
+   * @return the original pipeline
+   */
+  @VisibleForTesting
+  public Pipeline getPipeline() {
+    return this.pipeline;
+  }
+
+  /**
+   * Gets the Pipeline based on which the location report can be sent to the OM.
+   * This is necessary, as implementors might use special pipeline information
+   * that can be created during commit, but not during initialization,
+   * and might need to update some Pipeline information returned in
+   * OMKeyLocationInfo.
+   * @return
+   */
+  Pipeline getPipelineForOMLocationReport(){
+    return getPipeline();
+  }
+
+  long getCurrentPosition() {
+    return this.currentPosition;
+  }
+
+  BufferPool getBufferPool() {
+    return this.bufferPool;
   }
 
   /**
@@ -214,8 +342,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
 
     public Builder setXceiverClientManager(
-        XceiverClientFactory
-        xClientManager) {
+        XceiverClientFactory xClientManager) {
       this.xceiverClientManager = xClientManager;
       return this;
     }
@@ -257,39 +384,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
           token, config);
     }
   }
-
-  @VisibleForTesting
-  public OutputStream getOutputStream() {
-    return outputStream;
-  }
-
-  public BlockID getBlockID() {
-    return blockID;
-  }
-
-  public String getKey() {
-    return key;
-  }
-
-  public XceiverClientFactory getXceiverClientManager() {
-    return xceiverClientManager;
-  }
-
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  public long getCurrentPosition() {
-    return currentPosition;
-  }
-
-  public BufferPool getBufferPool() {
-    return bufferPool;
-  }
-
-  public void setCurrentPosition(long curPosition) {
-    this.currentPosition = curPosition;
-  }
 }
 
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index e9147a8..38f0aa8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -44,21 +44,37 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class manages the stream entries list and handles block allocation
- * from OzoneManager.
+ * A BlockOutputStreamEntryPool manages the communication with OM during writing
+ * a Key to Ozone with {@link KeyOutputStream}.
+ * Block allocation, handling of pre-allocated blocks, and managing stream
+ * entries that represent a writing channel towards DataNodes are the main
+ * responsibility of this class.
  */
 public class BlockOutputStreamEntryPool {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockOutputStreamEntryPool.class);
 
+  /**
+   * List of stream entries that are used to write a block of data.
+   */
   private final List<BlockOutputStreamEntry> streamEntries;
   private final OzoneClientConfig config;
+  /**
+   * The actual stream entry we are writing into. Note that a stream entry is
+   * allowed to manage more streams, as for example in the EC write case, where
+   * an entry represents an EC block group.
+   */
   private int currentStreamIndex;
   private final OzoneManagerProtocol omClient;
   private final OmKeyArgs keyArgs;
   private final XceiverClientFactory xceiverClientFactory;
   private final String requestID;
+  /**
+   * A {@link BufferPool} shared between all
+   * {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by
+   * the entries in the pool.
+   */
   private final BufferPool bufferPool;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private final long openID;
@@ -145,9 +161,17 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
-  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
-    Preconditions.checkNotNull(subKeyInfo.getPipeline());
-    BlockOutputStreamEntry.Builder builder =
+  /**
+   * Method to create a stream entry instance based on the
+   * {@link OmKeyLocationInfo}.
+   * If implementations require additional data to create the entry, they need
+   * to get that data before starting to create entries.
+   * @param subKeyInfo the {@link OmKeyLocationInfo} object that describes the
+   *                   key to be written.
+   * @return a BlockOutputStreamEntry instance that handles how data is written.
+   */
+  BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+    return
         new BlockOutputStreamEntry.Builder()
             .setBlockID(subKeyInfo.getBlockID())
             .setKey(keyArgs.getKeyName())
@@ -156,22 +180,45 @@ public class BlockOutputStreamEntryPool {
             .setConfig(config)
             .setLength(subKeyInfo.getLength())
             .setBufferPool(bufferPool)
-            .setToken(subKeyInfo.getToken());
-    streamEntries.add(builder.build());
+            .setToken(subKeyInfo.getToken())
+            .build();
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    streamEntries.add(createStreamEntry(subKeyInfo));
   }
 
-  public List<OmKeyLocationInfo> getLocationInfoList()  {
+  /**
+   * Returns the list of {@link OmKeyLocationInfo} object that describes to OM
+   * where the blocks of the key have been written.
+   * @return the location info list of written blocks.
+   */
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    List<OmKeyLocationInfo> locationInfoList;
+    List<OmKeyLocationInfo> currBlocksLocationInfoList =
+        getOmKeyLocationInfos(streamEntries);
+    locationInfoList = currBlocksLocationInfoList;
+    return locationInfoList;
+  }
+
+  private List<OmKeyLocationInfo> getOmKeyLocationInfos(
+      List<BlockOutputStreamEntry> streams) {
     List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
-    for (BlockOutputStreamEntry streamEntry : streamEntries) {
+    for (BlockOutputStreamEntry streamEntry : streams) {
       long length = streamEntry.getCurrentPosition();
 
       // Commit only those blocks to OzoneManager which are not empty
       if (length != 0) {
         OmKeyLocationInfo info =
-            new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
-                .setLength(streamEntry.getCurrentPosition()).setOffset(0)
+            new OmKeyLocationInfo.Builder()
+                .setBlockID(streamEntry.getBlockID())
+                .setLength(streamEntry.getCurrentPosition())
+                .setOffset(0)
                 .setToken(streamEntry.getToken())
-                .setPipeline(streamEntry.getPipeline()).build();
+                .setPipeline(streamEntry.getPipelineForOMLocationReport())
+                .build();
         locationInfoList.add(info);
       }
       if (LOG.isDebugEnabled()) {
@@ -185,6 +232,19 @@ public class BlockOutputStreamEntryPool {
   }
 
   /**
+   * Retrieves the {@link BufferPool} instance shared between managed block
+   * output stream entries.
+   * @return the shared buffer pool.
+   */
+  public BufferPool getBufferPool() {
+    return this.bufferPool;
+  }
+
+  OzoneClientConfig getConfig() {
+    return config;
+  }
+
+  /**
    * Discards the subsequent pre allocated blocks and removes the streamEntries
    * from the streamEntries list for the container which is closed.
    * @param containerID id of the closed container
@@ -211,10 +271,12 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
+  @VisibleForTesting
   List<BlockOutputStreamEntry> getStreamEntries() {
     return streamEntries;
   }
 
+  @VisibleForTesting
   XceiverClientFactory getXceiverClientFactory() {
     return xceiverClientFactory;
   }
@@ -224,8 +286,8 @@ public class BlockOutputStreamEntryPool {
   }
 
   long getKeyLength() {
-    return streamEntries.stream().mapToLong(
-        BlockOutputStreamEntry::getCurrentPosition).sum();
+    return streamEntries.stream()
+        .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
   }
   /**
    * Contact OM to get a new block. Set the new block with the index (e.g.
@@ -244,7 +306,16 @@ public class BlockOutputStreamEntryPool {
     addKeyLocationInfo(subKeyInfo);
   }
 
-
+  /**
+   * Commits the keys with Ozone Manager(s).
+   * At the end of the write committing the key from client side lets the OM
+   * know that the data has been written and to where. With this info OM can
+   * register the metadata stored in OM and SCM about the key that was written.
+   * @param offset the offset on which the key writer stands at the time of
+   *               finishing data writes. (Has to be equal and checked against
+   *               the actual length written by the stream entries.)
+   * @throws IOException in case there is an I/O problem during communication.
+   */
   void commitKey(long offset) throws IOException {
     if (keyArgs != null) {
       // in test, this could be null
@@ -266,7 +337,7 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
-  public BlockOutputStreamEntry getCurrentStreamEntry() {
+  BlockOutputStreamEntry getCurrentStreamEntry() {
     if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
       return null;
     } else {
@@ -274,6 +345,12 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
+  /**
+   * Allocates a new block with OM if the current stream is closed, and new
+   * writes are to be handled.
+   * @return the new current open stream to write to
+   * @throws IOException if the block allocation failed.
+   */
   BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
     BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
     if (streamEntry != null && streamEntry.isClosed()) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 1e13c38..d5f6f5d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -299,8 +299,7 @@ public class KeyOutputStream extends OutputStream {
     Pipeline pipeline = streamEntry.getPipeline();
     PipelineID pipelineId = pipeline.getId();
     long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
-    //set the correct length for the current stream
-    streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
+    streamEntry.resetToAckedPosition();
     long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
     if (containerExclusionException) {
       LOG.debug(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org