You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/06/25 14:26:40 UTC

[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658806450



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();

Review comment:
       Thanks. As I have cleared already. We don't need this. I removed this in latest patch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org