You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/06/15 06:39:08 UTC

[GitHub] [ozone] umamaheswararao opened a new pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

umamaheswararao opened a new pull request #2335:
URL: https://github.com/apache/ozone/pull/2335


   ## What changes were proposed in this pull request?
   Here is the initial patch for review.
   Added ECKeyOutputStream to handle EC mode writes.
   This will write the data in chunk by chunk to data nodes. Currently datanodes are stub DN classes for easy tests until we have all OM APIs ready.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-4940
   
   ## How was this patch tested?
   
   Used the stub classes for OMTrasport, Datanode storage etc. 
   Written the chunk data to EC key and verified by reading back in regular read mode by tweaking stub pipeline and also tested directly from stub data node storages. Adding some more tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r657874685



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
##########
@@ -155,6 +156,9 @@ public BlockOutputStream(
     this.token = token;
 
     replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
+    if (replicationIndex > 0) {

Review comment:
       I would tend to agree we should subclass BlockOutputStream here. The changes are quite small, but they may get larger in the future, and then there will be a temptation to add more and more EC things into the original class.
   
   The HDFS code is full of "if EC else ..." logic and it makes it harder to follow the code generally. Its also good to avoid changing the original code if we can, as it works, and it makes reviews easier. Not just now, but later when we come to merge the branches. People get nervous when critical code paths get modified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r657866780



##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
##########
@@ -71,7 +71,7 @@
    * ContainerStateManager#getMatchingContainerByPipeline to take a lock on
    * the container allocations for a particular pipeline.
    */
-  private Pipeline(PipelineID id,

Review comment:
       Right now Pipeline is an immutable object, and this changes that. There is a comment in the pipeline class:
   
   ```  /**
      * The immutable properties of pipeline object is used in
      * ContainerStateManager#getMatchingContainerByPipeline to take a lock on
      * the container allocations for a particular pipeline.
      */
   ```
   
   I feel it would probably be better to keep these methods private as they are now and use the builder to create a pipeline object where needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r652815649



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec =
+      new RSErasureCodec(new Configuration(), options);

Review comment:
       new Configuration()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658730085



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
##########
@@ -95,6 +95,17 @@ void releaseBuffer(ChunkBuffer chunkBuffer) {
     currentBufferIndex--;
   }
 
+  void releaseCurrentBuffer(ChunkBuffer chunkBuffer) {

Review comment:
       Is this method required? Seems to be unused and the same as the `releaseBuffer`, but I may miss sg.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r659119721



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
##########
@@ -95,6 +95,17 @@ void releaseBuffer(ChunkBuffer chunkBuffer) {
     currentBufferIndex--;
   }
 
+  void releaseCurrentBuffer(ChunkBuffer chunkBuffer) {

Review comment:
       I cleared most of the unnecessary parts from BOS. I cleaned it. Thanks for the comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r659119588



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();
+    ecChunkBufferCache.clear();
+  }
+
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) throws IOException {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
+      boolean isParity) throws IOException {
+    ecChunkBufferCache
+        .addTo(blockOutputStreamEntryPool.getCurrIdx(), b, off, (int) len);
+    while (len > 0) {
+      try {
+
+        BlockOutputStreamEntry current =
+            blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        // length(len) will be in int range if the call is happening through
+        // write API of blockOutputStream. Length can be in long range if it
+        // comes via Exception path.
+        int expectedWriteLen =
+            Math.min((int) len, (int) current.getRemaining());
+        long currentPos = current.getWrittenDataLength();
+        // writeLen will be updated based on whether the write was succeeded
+        // or if it sees an exception, how much the actual write was
+        // acknowledged.
+        int writtenLength =
+            writeToOutputStream(current, len, b, expectedWriteLen, off,
+                currentPos, isParity);
+        currentBlockGroupLen += isParity ? 0 : writtenLength;
+        if (current.getRemaining() <= 0) {
+          // since the current block is already written close the stream.
+          closeCurrentStream(StreamAction.CLOSE);
+        }
+
+        len -= writtenLength;
+        off += writtenLength;
+
+      } catch (Exception e) {
+        markStreamClosed();
+        throw new IOException(e);
+      }
+
+      if (isFullCell) {
+        handleFlushOrClose(StreamAction.FLUSH);
+        blockOutputStreamEntryPool.updateToNextStream(numDataBlks + 2);

Review comment:
       Thanks. I corrected it. It was left by mistake.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao merged pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao merged pull request #2335:
URL: https://github.com/apache/ozone/pull/2335


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#issuecomment-878413052


   Based on the above +1, I am going ahead to commit this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r667013726



##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
##########
@@ -443,7 +443,7 @@ public static Builder newBuilder(Pipeline pipeline) {
     return new Builder(pipeline);
   }
 
-  private void setReplicaIndexes(Map<DatanodeDetails, Integer> replicaIndexes) {
+  public void setReplicaIndexes(Map<DatanodeDetails, Integer> replicaIndexes) {

Review comment:
       I agree. I think I missed as it was used later after build. I will correct that usage as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658010473



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();
+    ecChunkBufferCache.clear();
+  }
+
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) throws IOException {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
+      boolean isParity) throws IOException {
+    ecChunkBufferCache
+        .addTo(blockOutputStreamEntryPool.getCurrIdx(), b, off, (int) len);
+    while (len > 0) {
+      try {
+
+        BlockOutputStreamEntry current =
+            blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        // length(len) will be in int range if the call is happening through
+        // write API of blockOutputStream. Length can be in long range if it
+        // comes via Exception path.
+        int expectedWriteLen =
+            Math.min((int) len, (int) current.getRemaining());
+        long currentPos = current.getWrittenDataLength();
+        // writeLen will be updated based on whether the write was succeeded
+        // or if it sees an exception, how much the actual write was
+        // acknowledged.
+        int writtenLength =
+            writeToOutputStream(current, len, b, expectedWriteLen, off,
+                currentPos, isParity);
+        currentBlockGroupLen += isParity ? 0 : writtenLength;
+        if (current.getRemaining() <= 0) {
+          // since the current block is already written close the stream.
+          closeCurrentStream(StreamAction.CLOSE);
+        }
+
+        len -= writtenLength;
+        off += writtenLength;
+
+      } catch (Exception e) {
+        markStreamClosed();
+        throw new IOException(e);
+      }
+
+      if (isFullCell) {
+        handleFlushOrClose(StreamAction.FLUSH);
+        blockOutputStreamEntryPool.updateToNextStream(numDataBlks + 2);
+      }
+    }
+
+  }
+
+  private int writeToOutputStream(BlockOutputStreamEntry current, long len,
+      byte[] b, int writeLen, int off, long currentPos, boolean isParity)
+      throws IOException {
+    try {
+      current.write(b, off, writeLen);
+      if (!isParity) {
+        offset += writeLen;
+      }
+    } catch (IOException ioe) {
+      // for the current iteration, totalDataWritten - currentPos gives the
+      // amount of data already written to the buffer
+
+      // In the retryPath, the total data to be written will always be equal
+      // to or less than the max length of the buffer allocated.
+      // The len specified here is the combined sum of the data length of
+      // the buffers
+      Preconditions.checkState(len <= config.getStreamBufferMaxSize());
+      int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+      writeLen = dataWritten;
+
+      if (!isParity) {
+        offset += writeLen;
+      }
+      LOG.debug("writeLen {}, total len {}", writeLen, len);
+      handleException(current, ioe);
+    }
+    return writeLen;
+  }
+
+  private void handleException(BlockOutputStreamEntry streamEntry,
+      IOException exception) throws IOException {
+    Throwable t = HddsClientUtils.checkForException(exception);
+    Preconditions.checkNotNull(t);
+    // In EC, we will just close the current stream.
+    streamEntry.close();
+  }
+
+  private void markStreamClosed() {
+    blockOutputStreamEntryPool.cleanup();
+    closed = true;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkNotClosed();
+    handleFlushOrClose(StreamAction.FLUSH);
+  }
+
+  /**
+   * Close or Flush the latest outputStream depending upon the action.
+   * This function gets called when while write is going on, the current stream
+   * gets full or explicit flush or close request is made by client.
+   *
+   * @param op Flag which decides whether to call close or flush on the
+   *           outputStream.
+   * @throws IOException In case, flush or close fails with exception.
+   */
+  @SuppressWarnings("squid:S1141")
+  private void handleFlushOrClose(StreamAction op) throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      while (true) {
+        try {
+          BlockOutputStreamEntry entry =
+              blockOutputStreamEntryPool.getCurrentStreamEntry();
+          if (entry != null) {
+            try {
+              handleStreamAction(entry, op);
+            } catch (IOException ioe) {
+              handleException(entry, ioe);
+              continue;
+            }
+          }
+          return;
+        } catch (Exception e) {
+          markStreamClosed();
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void handleFlushOrCloseAllStreams(StreamAction op)
+      throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      List<BlockOutputStreamEntry> allStreamEntries =
+          blockOutputStreamEntryPool.getStreamEntries();
+      for (int i = 0; i < allStreamEntries.size(); i++) {
+        while (true) {
+          try {
+            BlockOutputStreamEntry entry = allStreamEntries.get(i);
+            if (entry != null) {
+              try {
+                handleStreamAction(entry, op);
+              } catch (IOException ioe) {
+                handleException(entry, ioe);
+                continue;
+              }
+            }
+            return;
+          } catch (Exception e) {
+            markStreamClosed();
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  private void closeCurrentStream(StreamAction op) throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      List<BlockOutputStreamEntry> allStreamEntries =
+          blockOutputStreamEntryPool.getStreamEntries();
+      for (int i = 0; i < allStreamEntries.size(); i++) {
+        while (true) {
+          try {
+            BlockOutputStreamEntry entry = allStreamEntries.get(i);
+            if (entry != null) {
+              try {
+                handleStreamAction(entry, op);
+              } catch (IOException ioe) {
+                handleException(entry, ioe);
+                continue;
+              }
+            }
+            return;
+          } catch (Exception e) {
+            markStreamClosed();
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction op)
+      throws IOException {
+    Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+    // failed servers can be null in case there is no data written in
+    // the stream
+    if (!failedServers.isEmpty()) {
+      blockOutputStreamEntryPool.getExcludeList().addDatanodes(failedServers);
+    }
+    switch (op) {
+    case CLOSE:
+      entry.close();
+      break;
+    case FULL:
+      if (entry.getRemaining() == 0) {
+        entry.close();
+      }
+      break;
+    case FLUSH:
+      entry.flush();
+      break;
+    default:
+      throw new IOException("Invalid Operation");
+    }
+  }
+
+  /**
+   * Commit the key to OM, this will add the blocks as the new key blocks.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+      if (!isException) {
+        Preconditions.checkArgument(writeOffset == offset);
+      }
+      blockOutputStreamEntryPool.endECBlock(numDataBlks);
+      //TODO: offset should not consider parity blocks length
+      blockOutputStreamEntryPool.commitKey(offset);
+    } finally {
+      blockOutputStreamEntryPool.cleanupAll();
+    }
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return blockOutputStreamEntryPool.getCommitUploadPartInfo();
+  }
+
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
+
+  @VisibleForTesting
+  public ExcludeList getExcludeList() {
+    return blockOutputStreamEntryPool.getExcludeList();
+  }
+
+  /**
+   * Builder class of ECKeyOutputStream.
+   */
+  public static class Builder {
+    private OpenKeySession openHandler;
+    private XceiverClientFactory xceiverManager;
+    private OzoneManagerProtocol omClient;
+    private int chunkSize;
+    private String requestID;
+    private String multipartUploadID;
+    private int multipartNumber;
+    private boolean isMultipartKey;
+    private boolean unsafeByteBufferConversion;
+    private OzoneClientConfig clientConfig;
+    private ReplicationConfig replicationConfig;
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartNumber(int partNumber) {
+      this.multipartNumber = partNumber;
+      return this;
+    }
+
+    public Builder setHandler(OpenKeySession handler) {
+      this.openHandler = handler;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(XceiverClientFactory manager) {
+      this.xceiverManager = manager;
+      return this;
+    }
+
+    public Builder setOmClient(OzoneManagerProtocol client) {
+      this.omClient = client;
+      return this;
+    }
+
+    public Builder setChunkSize(int size) {
+      this.chunkSize = size;
+      return this;
+    }
+
+    public Builder setRequestID(String id) {
+      this.requestID = id;
+      return this;
+    }
+
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
+    public Builder setConfig(OzoneClientConfig config) {
+      this.clientConfig = config;
+      return this;
+    }
+
+    public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+      this.unsafeByteBufferConversion = enabled;
+      return this;
+    }
+
+    public ECKeyOutputStream.Builder setReplicationConfig(
+        ReplicationConfig replConfig) {
+      this.replicationConfig = replConfig;
+      return this;
+    }
+
+    public ECKeyOutputStream build() {
+      return new ECKeyOutputStream(clientConfig, openHandler, xceiverManager,
+          omClient, chunkSize, requestID, replicationConfig, multipartUploadID,
+          multipartNumber, isMultipartKey, unsafeByteBufferConversion);
+    }
+  }
+
+  /**
+   * Verify that the output stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   *
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+              + blockOutputStreamEntryPool.getKeyName());
+    }
+  }
+
+  private static class ECChunkBuffers {
+    private final ByteBuffer[] buffers;
+    private final int dataBlks;
+    private final int parityBlks;
+    private final int totalBlks;
+    private int cellSize;
+
+    ECChunkBuffers(int cellSize, int numData, int numParity) {
+      this.cellSize = cellSize;
+      this.parityBlks = numParity;
+      this.dataBlks = numData;
+      this.totalBlks = this.dataBlks + this.parityBlks;
+      buffers = new ByteBuffer[this.totalBlks];
+      for (int i = 0; i < buffers.length; i++) {
+        buffers[i] = BUFFER_POOL.getBuffer(false, cellSize);
+        buffers[i].limit(cellSize);
+      }
+    }
+
+    private ByteBuffer[] getBuffers() {
+      return buffers;
+    }
+
+    private int addTo(int i, byte[] b, int off, int len) {
+      final ByteBuffer buf = buffers[i];
+      final int pos = buf.position() + len;
+      Preconditions.checkState(pos <= cellSize);
+      buf.put(b, off, len);
+      return pos;
+    }
+
+    private void clear() {
+      for (int i = 0; i < this.totalBlks; i++) {
+        buffers[i].clear();
+        buffers[i].limit(cellSize);
+      }
+    }
+
+    private void release() {

Review comment:
       I don't see this being called anywhere (Intellij marks it as unused). This means we are never returning the buffers to the pool and hence they cannot be reused. Should this be called when the KeyOutputStream() is closed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658806730



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
##########
@@ -258,6 +262,7 @@ public void write(byte[] b, int off, int len) throws IOException {
     while (len > 0) {
       allocateNewBufferIfNeeded();
       final int writeLen = Math.min(currentBufferRemaining, len);
+      LOG.info("writeLen: " + writeLen + "  off: " + off);

Review comment:
       Thanks for pointing. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r659119545



##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
##########
@@ -71,7 +71,7 @@
    * ContainerStateManager#getMatchingContainerByPipeline to take a lock on
    * the container allocations for a particular pipeline.
    */
-  private Pipeline(PipelineID id,

Review comment:
       Done. Make sense. Thanks

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();
+    ecChunkBufferCache.clear();
+  }
+
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) throws IOException {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
+      boolean isParity) throws IOException {
+    ecChunkBufferCache
+        .addTo(blockOutputStreamEntryPool.getCurrIdx(), b, off, (int) len);
+    while (len > 0) {
+      try {
+
+        BlockOutputStreamEntry current =
+            blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        // length(len) will be in int range if the call is happening through
+        // write API of blockOutputStream. Length can be in long range if it
+        // comes via Exception path.
+        int expectedWriteLen =
+            Math.min((int) len, (int) current.getRemaining());
+        long currentPos = current.getWrittenDataLength();
+        // writeLen will be updated based on whether the write was succeeded
+        // or if it sees an exception, how much the actual write was
+        // acknowledged.
+        int writtenLength =
+            writeToOutputStream(current, len, b, expectedWriteLen, off,
+                currentPos, isParity);
+        currentBlockGroupLen += isParity ? 0 : writtenLength;
+        if (current.getRemaining() <= 0) {
+          // since the current block is already written close the stream.
+          closeCurrentStream(StreamAction.CLOSE);
+        }
+
+        len -= writtenLength;
+        off += writtenLength;
+
+      } catch (Exception e) {
+        markStreamClosed();
+        throw new IOException(e);
+      }
+
+      if (isFullCell) {
+        handleFlushOrClose(StreamAction.FLUSH);
+        blockOutputStreamEntryPool.updateToNextStream(numDataBlks + 2);
+      }
+    }
+
+  }
+
+  private int writeToOutputStream(BlockOutputStreamEntry current, long len,
+      byte[] b, int writeLen, int off, long currentPos, boolean isParity)
+      throws IOException {
+    try {
+      current.write(b, off, writeLen);
+      if (!isParity) {
+        offset += writeLen;
+      }
+    } catch (IOException ioe) {
+      // for the current iteration, totalDataWritten - currentPos gives the
+      // amount of data already written to the buffer
+
+      // In the retryPath, the total data to be written will always be equal
+      // to or less than the max length of the buffer allocated.
+      // The len specified here is the combined sum of the data length of
+      // the buffers
+      Preconditions.checkState(len <= config.getStreamBufferMaxSize());
+      int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+      writeLen = dataWritten;
+
+      if (!isParity) {
+        offset += writeLen;
+      }
+      LOG.debug("writeLen {}, total len {}", writeLen, len);
+      handleException(current, ioe);
+    }
+    return writeLen;
+  }
+
+  private void handleException(BlockOutputStreamEntry streamEntry,
+      IOException exception) throws IOException {
+    Throwable t = HddsClientUtils.checkForException(exception);
+    Preconditions.checkNotNull(t);
+    // In EC, we will just close the current stream.
+    streamEntry.close();
+  }
+
+  private void markStreamClosed() {
+    blockOutputStreamEntryPool.cleanup();
+    closed = true;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkNotClosed();
+    handleFlushOrClose(StreamAction.FLUSH);
+  }
+
+  /**
+   * Close or Flush the latest outputStream depending upon the action.
+   * This function gets called when while write is going on, the current stream
+   * gets full or explicit flush or close request is made by client.
+   *
+   * @param op Flag which decides whether to call close or flush on the
+   *           outputStream.
+   * @throws IOException In case, flush or close fails with exception.
+   */
+  @SuppressWarnings("squid:S1141")
+  private void handleFlushOrClose(StreamAction op) throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      while (true) {
+        try {
+          BlockOutputStreamEntry entry =
+              blockOutputStreamEntryPool.getCurrentStreamEntry();
+          if (entry != null) {
+            try {
+              handleStreamAction(entry, op);
+            } catch (IOException ioe) {
+              handleException(entry, ioe);
+              continue;
+            }
+          }
+          return;
+        } catch (Exception e) {
+          markStreamClosed();
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void handleFlushOrCloseAllStreams(StreamAction op)
+      throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      List<BlockOutputStreamEntry> allStreamEntries =
+          blockOutputStreamEntryPool.getStreamEntries();
+      for (int i = 0; i < allStreamEntries.size(); i++) {
+        while (true) {
+          try {
+            BlockOutputStreamEntry entry = allStreamEntries.get(i);
+            if (entry != null) {
+              try {
+                handleStreamAction(entry, op);
+              } catch (IOException ioe) {
+                handleException(entry, ioe);
+                continue;
+              }
+            }
+            return;
+          } catch (Exception e) {
+            markStreamClosed();
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  private void closeCurrentStream(StreamAction op) throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      List<BlockOutputStreamEntry> allStreamEntries =
+          blockOutputStreamEntryPool.getStreamEntries();
+      for (int i = 0; i < allStreamEntries.size(); i++) {
+        while (true) {
+          try {
+            BlockOutputStreamEntry entry = allStreamEntries.get(i);
+            if (entry != null) {
+              try {
+                handleStreamAction(entry, op);
+              } catch (IOException ioe) {
+                handleException(entry, ioe);
+                continue;
+              }
+            }
+            return;
+          } catch (Exception e) {
+            markStreamClosed();
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction op)
+      throws IOException {
+    Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+    // failed servers can be null in case there is no data written in
+    // the stream
+    if (!failedServers.isEmpty()) {
+      blockOutputStreamEntryPool.getExcludeList().addDatanodes(failedServers);
+    }
+    switch (op) {
+    case CLOSE:
+      entry.close();
+      break;
+    case FULL:
+      if (entry.getRemaining() == 0) {
+        entry.close();
+      }
+      break;
+    case FLUSH:
+      entry.flush();
+      break;
+    default:
+      throw new IOException("Invalid Operation");
+    }
+  }
+
+  /**
+   * Commit the key to OM, this will add the blocks as the new key blocks.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+      if (!isException) {
+        Preconditions.checkArgument(writeOffset == offset);
+      }
+      blockOutputStreamEntryPool.endECBlock(numDataBlks);
+      //TODO: offset should not consider parity blocks length
+      blockOutputStreamEntryPool.commitKey(offset);
+    } finally {
+      blockOutputStreamEntryPool.cleanupAll();
+    }
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return blockOutputStreamEntryPool.getCommitUploadPartInfo();
+  }
+
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
+
+  @VisibleForTesting
+  public ExcludeList getExcludeList() {
+    return blockOutputStreamEntryPool.getExcludeList();
+  }
+
+  /**
+   * Builder class of ECKeyOutputStream.
+   */
+  public static class Builder {
+    private OpenKeySession openHandler;
+    private XceiverClientFactory xceiverManager;
+    private OzoneManagerProtocol omClient;
+    private int chunkSize;
+    private String requestID;
+    private String multipartUploadID;
+    private int multipartNumber;
+    private boolean isMultipartKey;
+    private boolean unsafeByteBufferConversion;
+    private OzoneClientConfig clientConfig;
+    private ReplicationConfig replicationConfig;
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartNumber(int partNumber) {
+      this.multipartNumber = partNumber;
+      return this;
+    }
+
+    public Builder setHandler(OpenKeySession handler) {
+      this.openHandler = handler;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(XceiverClientFactory manager) {
+      this.xceiverManager = manager;
+      return this;
+    }
+
+    public Builder setOmClient(OzoneManagerProtocol client) {
+      this.omClient = client;
+      return this;
+    }
+
+    public Builder setChunkSize(int size) {
+      this.chunkSize = size;
+      return this;
+    }
+
+    public Builder setRequestID(String id) {
+      this.requestID = id;
+      return this;
+    }
+
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
+    public Builder setConfig(OzoneClientConfig config) {
+      this.clientConfig = config;
+      return this;
+    }
+
+    public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+      this.unsafeByteBufferConversion = enabled;
+      return this;
+    }
+
+    public ECKeyOutputStream.Builder setReplicationConfig(
+        ReplicationConfig replConfig) {
+      this.replicationConfig = replConfig;
+      return this;
+    }
+
+    public ECKeyOutputStream build() {
+      return new ECKeyOutputStream(clientConfig, openHandler, xceiverManager,
+          omClient, chunkSize, requestID, replicationConfig, multipartUploadID,
+          multipartNumber, isMultipartKey, unsafeByteBufferConversion);
+    }
+  }
+
+  /**
+   * Verify that the output stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   *
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+              + blockOutputStreamEntryPool.getKeyName());
+    }
+  }
+
+  private static class ECChunkBuffers {
+    private final ByteBuffer[] buffers;
+    private final int dataBlks;
+    private final int parityBlks;
+    private final int totalBlks;
+    private int cellSize;
+
+    ECChunkBuffers(int cellSize, int numData, int numParity) {
+      this.cellSize = cellSize;
+      this.parityBlks = numParity;
+      this.dataBlks = numData;
+      this.totalBlks = this.dataBlks + this.parityBlks;
+      buffers = new ByteBuffer[this.totalBlks];
+      for (int i = 0; i < buffers.length; i++) {
+        buffers[i] = BUFFER_POOL.getBuffer(false, cellSize);
+        buffers[i].limit(cellSize);
+      }
+    }
+
+    private ByteBuffer[] getBuffers() {
+      return buffers;
+    }
+
+    private int addTo(int i, byte[] b, int off, int len) {
+      final ByteBuffer buf = buffers[i];
+      final int pos = buf.position() + len;
+      Preconditions.checkState(pos <= cellSize);
+      buf.put(b, off, len);
+      return pos;
+    }
+
+    private void clear() {
+      for (int i = 0; i < this.totalBlks; i++) {
+        buffers[i].clear();
+        buffers[i].limit(cellSize);
+      }
+    }
+
+    private void release() {

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658808845



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();
+    ecChunkBufferCache.clear();
+  }
+
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) throws IOException {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);

Review comment:
       We discussed this and the arraycoppy would not really copy the contents. It will just copy the refs. So, the original byte[] arrays will be modified. Anyway I changed to have input buffers and created parity buffers separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r655971848



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();

Review comment:
       Can you please help me to understand why do we need these flips?

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
##########
@@ -155,6 +156,9 @@ public BlockOutputStream(
     this.token = token;
 
     replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
+    if (replicationIndex > 0) {

Review comment:
       I think it would be better to differentiate between Ratis and EC `BlockOutputStream` and `BlockOutputStreamEntryPool`. It can be more safe (Ratis code path shouldn't be changed) and more easy to test and better structured. I think this part can be added to a subclass (eg `ECBlockOutputStream`) which can override the required parts.  

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();
+    ecChunkBufferCache.clear();
+  }
+
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) throws IOException {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);

Review comment:
       Do we really need arraycopy here? As far as I understand, we don't use these buffers anywhere else until we do this encoding. 

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
##########
@@ -258,6 +262,7 @@ public void write(byte[] b, int off, int len) throws IOException {
     while (len > 0) {
       allocateNewBufferIfNeeded();
       final int writeLen = Math.min(currentBufferRemaining, len);
+      LOG.info("writeLen: " + writeLen + "  off: " + off);

Review comment:
       Remaining of a debug session I guess...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658012928



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;

Review comment:
       I guess these are hard-coded to 3, 2 for POC work, but it would probably make sense to just pass an ECReplicationConfig into this constructor now and use the values supplied in it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r667007506



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the stream entries list and handles block allocation
+ * from OzoneManager for EC writes.
+ */
+public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
+  private final List<BlockOutputStreamEntry> finishedStreamEntries;
+  private final ECReplicationConfig ecReplicationConfig;
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
+      OzoneManagerProtocol omClient,
+      String requestId,
+      ReplicationConfig replicationConfig,
+      String uploadID,
+      int partNumber,
+      boolean isMultipart,
+      OmKeyInfo info,
+      boolean unsafeByteBufferConversion,
+      XceiverClientFactory xceiverClientFactory,
+      long openID) {
+    super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
+        isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
+        openID);
+    this.finishedStreamEntries = new ArrayList<>();
+    assert replicationConfig instanceof ECReplicationConfig;
+    this.ecReplicationConfig = (ECReplicationConfig) replicationConfig;
+  }
+
+  @Override
+  void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    List<DatanodeDetails> nodes = subKeyInfo.getPipeline().getNodes();
+    for (int i = 0; i < nodes.size(); i++) {
+      List<DatanodeDetails> nodeStatus = new ArrayList<>();
+      nodeStatus.add(nodes.get(i));
+      Pipeline pipeline =
+          Pipeline.newBuilder().setId(subKeyInfo.getPipeline().getId())
+              .setReplicationConfig(
+                  subKeyInfo.getPipeline().getReplicationConfig())
+              .setState(subKeyInfo.getPipeline().getPipelineState())
+              .setNodes(nodeStatus).build();
+      Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
+      nodeVsIdx.put(nodes.get(i), i + 1);
+      pipeline.setReplicaIndexes(nodeVsIdx);

Review comment:
       Can you use Pipeline.Builder to set the ReplicaIndexes? That would mean we can keep setReplicaIndexes private and Pipeline Immutable as it is now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] elek commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
elek commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r652815649



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec =
+      new RSErasureCodec(new Configuration(), options);

Review comment:
       new Configuration()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r659119610



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;

Review comment:
       Extracted from replConfig now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658011237



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();
+    ecChunkBufferCache.clear();
+  }
+
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) throws IOException {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
+      boolean isParity) throws IOException {
+    ecChunkBufferCache
+        .addTo(blockOutputStreamEntryPool.getCurrIdx(), b, off, (int) len);
+    while (len > 0) {
+      try {
+
+        BlockOutputStreamEntry current =
+            blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        // length(len) will be in int range if the call is happening through
+        // write API of blockOutputStream. Length can be in long range if it
+        // comes via Exception path.
+        int expectedWriteLen =
+            Math.min((int) len, (int) current.getRemaining());
+        long currentPos = current.getWrittenDataLength();
+        // writeLen will be updated based on whether the write was succeeded
+        // or if it sees an exception, how much the actual write was
+        // acknowledged.
+        int writtenLength =
+            writeToOutputStream(current, len, b, expectedWriteLen, off,
+                currentPos, isParity);
+        currentBlockGroupLen += isParity ? 0 : writtenLength;
+        if (current.getRemaining() <= 0) {
+          // since the current block is already written close the stream.
+          closeCurrentStream(StreamAction.CLOSE);
+        }
+
+        len -= writtenLength;
+        off += writtenLength;
+
+      } catch (Exception e) {
+        markStreamClosed();
+        throw new IOException(e);
+      }
+
+      if (isFullCell) {
+        handleFlushOrClose(StreamAction.FLUSH);
+        blockOutputStreamEntryPool.updateToNextStream(numDataBlks + 2);

Review comment:
       This "2" should be "numParityBlks" I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r658806450



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
##########
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int cellSize = 1000;
+  private int numDataBlks = 3;
+  private int numParityBlks = 2;
+  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private ECSchema schema =
+      new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private RSErasureCodec codec;
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    ecChunkBufferCache =
+        new ECChunkBuffers(cellSize, numDataBlks, numParityBlks);
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId(),
+            true);
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils
+        .getRetryPolicyByException(config.getMaxRetryCount(),
+            config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    this.codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.buffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, cellSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == cellSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / cellSize;
+    int lastCellSize = remLen % cellSize;
+    while (iters > 0) {
+      handleWrite(b, off, cellSize, true, false);
+      off += cellSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock(numDataBlks);
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+    encode(encoder, numDataBlks, buffers);
+    for (int i = numDataBlks; i < numDataBlks + 2; i++) {
+      handleWrite(buffers[i].array(), 0, cellSize, true, true);
+    }
+
+    ecChunkBufferCache.flipAllDataBuffers();

Review comment:
       Thanks. As I have cleared already. We don't need this. I removed this in latest patch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2335: HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC mode writes.

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2335:
URL: https://github.com/apache/ozone/pull/2335#discussion_r667009300



##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
##########
@@ -443,7 +443,7 @@ public static Builder newBuilder(Pipeline pipeline) {
     return new Builder(pipeline);
   }
 
-  private void setReplicaIndexes(Map<DatanodeDetails, Integer> replicaIndexes) {
+  public void setReplicaIndexes(Map<DatanodeDetails, Integer> replicaIndexes) {

Review comment:
       I think this should remain private to keep Pipeline immutable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org