You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/03/01 19:22:09 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-6364. EC: Discard pre-allocated blocks to eliminate worthless retries. (#3127)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 38409ea  HDDS-6364. EC: Discard pre-allocated blocks to eliminate worthless retries. (#3127)
38409ea is described below

commit 38409ea5a052be80bc754842fd978e3b4498eeed
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Mar 2 03:21:12 2022 +0800

    HDDS-6364. EC: Discard pre-allocated blocks to eliminate worthless retries. (#3127)
---
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |  13 ++-
 .../hadoop/ozone/client/TestOzoneECClient.java     | 127 +++++++++++++++++++++
 .../ozone/client/io/BlockStreamAccessor.java       |  51 +++++++++
 3 files changed, 187 insertions(+), 4 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 11172ad..34a6957 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -191,14 +191,19 @@ public class ECKeyOutputStream extends KeyOutputStream {
       failedParityStripeChunkLens[i] = parityBuffers[i].limit();
     }
 
-    blockOutputStreamEntryPool.getCurrentStreamEntry().resetToFirstEntry();
     // Rollback the length/offset updated as part of this failed stripe write.
     offset -= failedStripeDataSize;
-    blockOutputStreamEntryPool.getCurrentStreamEntry()
-        .resetToAckedPosition();
 
+    final ECBlockOutputStreamEntry failedStreamEntry =
+        blockOutputStreamEntryPool.getCurrentStreamEntry();
+    failedStreamEntry.resetToFirstEntry();
+    failedStreamEntry.resetToAckedPosition();
+    // All pre-allocated blocks from the same pipeline
+    // should be dropped to eliminate worthless retries.
+    blockOutputStreamEntryPool.discardPreallocatedBlocks(-1,
+        failedStreamEntry.getPipeline().getId());
     // Let's close the current entry.
-    blockOutputStreamEntryPool.getCurrentStreamEntry().close();
+    failedStreamEntry.close();
 
     // Let's rewrite the last stripe, so that it will be written to new block
     // group.
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 0ee6808..38b0940 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -28,11 +28,16 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.BlockStreamAccessor;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.ozone.erasurecode.rawcoder.RSRawErasureCoderFactory;
@@ -908,6 +913,109 @@ public class TestOzoneECClient {
     }
   }
 
+  @Test
+  public void testDiscardPreAllocatedBlocksPreventRetryExceeds()
+      throws IOException {
+    close();
+    OzoneConfiguration con = new OzoneConfiguration();
+    int maxRetries = 3;
+    con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+        2, StorageUnit.KB);
+    con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES,
+        maxRetries);
+    MultiNodePipelineBlockAllocator blkAllocator =
+        new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks,
+            15);
+    createNewClient(con, blkAllocator);
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    int numStripesBeforeFailure = 1;
+    int numStripesAfterFailure = 1;
+    int numStripesTotal = numStripesBeforeFailure + numStripesAfterFailure;
+    int numExpectedBlockGrps = 2;
+    // fail any DNs to trigger retry
+    int[] nodesIndexesToMarkFailure = {0, 1};
+    long keySize = (long) chunkSize * dataBlocks * numStripesTotal;
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, keySize,
+        new ECReplicationConfig(dataBlocks, parityBlocks,
+            ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+      ECKeyOutputStream kos = (ECKeyOutputStream) out.getOutputStream();
+      List<OmKeyLocationInfo> blockInfos = getAllLocationInfoList(kos);
+      Assert.assertEquals(1, blockInfos.size());
+
+      // Mock some pre-allocated blocks to the key,
+      // should be > maxRetries
+      int numPreAllocatedBlocks = maxRetries + 1;
+      BlockID blockID = blockInfos.get(0).getBlockID();
+      Pipeline pipeline = blockInfos.get(0).getPipeline();
+      List<OmKeyLocationInfo> omKeyLocationInfos = new ArrayList<>();
+      for (int i = 0; i < numPreAllocatedBlocks; i++) {
+        BlockID nextBlockID = new BlockID(blockID.getContainerID(),
+            blockID.getLocalID() + i + 1);
+        omKeyLocationInfos.add(new OmKeyLocationInfo.Builder()
+            .setBlockID(nextBlockID)
+            .setPipeline(pipeline)
+            .build());
+      }
+      OmKeyLocationInfoGroup omKeyLocationInfoGroup =
+          new OmKeyLocationInfoGroup(0, omKeyLocationInfos);
+      kos.addPreallocateBlocks(omKeyLocationInfoGroup, 0);
+
+      // Good writes
+      for (int j = 0; j < numStripesBeforeFailure; j++) {
+        for (int i = 0; i < dataBlocks; i++) {
+          out.write(inputChunks[i]);
+        }
+      }
+
+      // Make the writes fail to trigger retry
+      List<DatanodeDetails> failedDNs = new ArrayList<>();
+      List<HddsProtos.DatanodeDetailsProto> dns = allocator.getClusterDns();
+      for (int j = 0; j < nodesIndexesToMarkFailure.length; j++) {
+        failedDNs.add(DatanodeDetails
+            .getFromProtoBuf(dns.get(nodesIndexesToMarkFailure[j])));
+      }
+      // First let's set storage as bad
+      ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+      // Writes that will retry due to failed DNs
+      try {
+        for (int j = 0; j < numStripesAfterFailure; j++) {
+          for (int i = 0; i < dataBlocks; i++) {
+            out.write(inputChunks[i]);
+          }
+        }
+      } catch (IOException e) {
+        // If we don't discard pre-allocated blocks,
+        // retries should exceed the maxRetries and write will fail.
+        Assert.fail("Max retries exceeded");
+      }
+    }
+
+    final OzoneKeyDetails key = bucket.getKey(keyName);
+    Assert.assertEquals(numExpectedBlockGrps,
+        key.getOzoneKeyLocations().size());
+
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[chunkSize];
+      for (int i = 0; i < dataBlocks * numStripesTotal; i++) {
+        Assert.assertEquals(inputChunks[i % dataBlocks].length,
+            is.read(fileContent));
+        Assert.assertArrayEquals(
+            "Expected: " + new String(inputChunks[i % dataBlocks], UTF_8)
+                + " \n " + "Actual: " + new String(fileContent, UTF_8),
+            inputChunks[i % dataBlocks], fileContent);
+      }
+    }
+  }
+
   private OzoneBucket writeIntoECKey(byte[] data, String key,
       DefaultReplicationConfig defaultReplicationConfig) throws IOException {
     return writeIntoECKey(new byte[][] {data}, key, defaultReplicationConfig);
@@ -936,4 +1044,23 @@ public class TestOzoneECClient {
     }
     return bucket;
   }
+
+  private List<OmKeyLocationInfo> getAllLocationInfoList(
+      ECKeyOutputStream kos) {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    for (BlockOutputStreamEntry streamEntry : kos.getStreamEntries()) {
+      BlockStreamAccessor streamAccessor =
+          new BlockStreamAccessor(streamEntry);
+      OmKeyLocationInfo info =
+          new OmKeyLocationInfo.Builder()
+              .setBlockID(streamAccessor.getStreamBlockID())
+              .setLength(streamAccessor.getStreamCurrentPosition())
+              .setOffset(0)
+              .setToken(streamAccessor.getStreamToken())
+              .setPipeline(streamAccessor.getStreamPipeline())
+              .build();
+      locationInfoList.add(info);
+    }
+    return locationInfoList;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/BlockStreamAccessor.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/BlockStreamAccessor.java
new file mode 100644
index 0000000..d9a7211
--- /dev/null
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/BlockStreamAccessor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * An accessor helper class to BlockOutputStreamEntry for test.
+ */
+public class BlockStreamAccessor {
+
+  private final BlockOutputStreamEntry streamEntry;
+
+  public BlockStreamAccessor(BlockOutputStreamEntry entry) {
+    this.streamEntry = entry;
+  }
+
+  public BlockID getStreamBlockID() {
+    return streamEntry.getBlockID();
+  }
+
+  public Pipeline getStreamPipeline() {
+    return streamEntry.getPipeline();
+  }
+
+  public Token<OzoneBlockTokenIdentifier> getStreamToken() {
+    return streamEntry.getToken();
+  }
+
+  public long getStreamCurrentPosition() {
+    return streamEntry.getCurrentPosition();
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org