You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/03/01 19:22:09 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-6364. EC: Discard pre-allocated blocks to eliminate worthless retries. (#3127)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 38409ea HDDS-6364. EC: Discard pre-allocated blocks to eliminate worthless retries. (#3127)
38409ea is described below
commit 38409ea5a052be80bc754842fd978e3b4498eeed
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed Mar 2 03:21:12 2022 +0800
HDDS-6364. EC: Discard pre-allocated blocks to eliminate worthless retries. (#3127)
---
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 13 ++-
.../hadoop/ozone/client/TestOzoneECClient.java | 127 +++++++++++++++++++++
.../ozone/client/io/BlockStreamAccessor.java | 51 +++++++++
3 files changed, 187 insertions(+), 4 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 11172ad..34a6957 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -191,14 +191,19 @@ public class ECKeyOutputStream extends KeyOutputStream {
failedParityStripeChunkLens[i] = parityBuffers[i].limit();
}
- blockOutputStreamEntryPool.getCurrentStreamEntry().resetToFirstEntry();
// Rollback the length/offset updated as part of this failed stripe write.
offset -= failedStripeDataSize;
- blockOutputStreamEntryPool.getCurrentStreamEntry()
- .resetToAckedPosition();
+ final ECBlockOutputStreamEntry failedStreamEntry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ failedStreamEntry.resetToFirstEntry();
+ failedStreamEntry.resetToAckedPosition();
+ // All pre-allocated blocks from the same pipeline
+ // should be dropped to eliminate worthless retries.
+ blockOutputStreamEntryPool.discardPreallocatedBlocks(-1,
+ failedStreamEntry.getPipeline().getId());
// Let's close the current entry.
- blockOutputStreamEntryPool.getCurrentStreamEntry().close();
+ failedStreamEntry.close();
// Let's rewrite the last stripe, so that it will be written to new block
// group.
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 0ee6808..38b0940 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -28,11 +28,16 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.BlockStreamAccessor;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ozone.erasurecode.rawcoder.RSRawErasureCoderFactory;
@@ -908,6 +913,109 @@ public class TestOzoneECClient {
}
}
+ @Test
+ public void testDiscardPreAllocatedBlocksPreventRetryExceeds()
+ throws IOException {
+ close();
+ OzoneConfiguration con = new OzoneConfiguration();
+ int maxRetries = 3;
+ con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
+ 2, StorageUnit.KB);
+ con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES,
+ maxRetries);
+ MultiNodePipelineBlockAllocator blkAllocator =
+ new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks,
+ 15);
+ createNewClient(con, blkAllocator);
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ int numStripesBeforeFailure = 1;
+ int numStripesAfterFailure = 1;
+ int numStripesTotal = numStripesBeforeFailure + numStripesAfterFailure;
+ int numExpectedBlockGrps = 2;
+ // fail any DNs to trigger retry
+ int[] nodesIndexesToMarkFailure = {0, 1};
+ long keySize = (long) chunkSize * dataBlocks * numStripesTotal;
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, keySize,
+ new ECReplicationConfig(dataBlocks, parityBlocks,
+ ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+ ECKeyOutputStream kos = (ECKeyOutputStream) out.getOutputStream();
+ List<OmKeyLocationInfo> blockInfos = getAllLocationInfoList(kos);
+ Assert.assertEquals(1, blockInfos.size());
+
+ // Mock some pre-allocated blocks to the key,
+ // should be > maxRetries
+ int numPreAllocatedBlocks = maxRetries + 1;
+ BlockID blockID = blockInfos.get(0).getBlockID();
+ Pipeline pipeline = blockInfos.get(0).getPipeline();
+ List<OmKeyLocationInfo> omKeyLocationInfos = new ArrayList<>();
+ for (int i = 0; i < numPreAllocatedBlocks; i++) {
+ BlockID nextBlockID = new BlockID(blockID.getContainerID(),
+ blockID.getLocalID() + i + 1);
+ omKeyLocationInfos.add(new OmKeyLocationInfo.Builder()
+ .setBlockID(nextBlockID)
+ .setPipeline(pipeline)
+ .build());
+ }
+ OmKeyLocationInfoGroup omKeyLocationInfoGroup =
+ new OmKeyLocationInfoGroup(0, omKeyLocationInfos);
+ kos.addPreallocateBlocks(omKeyLocationInfoGroup, 0);
+
+ // Good writes
+ for (int j = 0; j < numStripesBeforeFailure; j++) {
+ for (int i = 0; i < dataBlocks; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+
+ // Make the writes fail to trigger retry
+ List<DatanodeDetails> failedDNs = new ArrayList<>();
+ List<HddsProtos.DatanodeDetailsProto> dns = allocator.getClusterDns();
+ for (int j = 0; j < nodesIndexesToMarkFailure.length; j++) {
+ failedDNs.add(DatanodeDetails
+ .getFromProtoBuf(dns.get(nodesIndexesToMarkFailure[j])));
+ }
+ // First let's set storage as bad
+ ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+ // Writes that will retry due to failed DNs
+ try {
+ for (int j = 0; j < numStripesAfterFailure; j++) {
+ for (int i = 0; i < dataBlocks; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ } catch (IOException e) {
+ // If we don't discard pre-allocated blocks,
+ // retries should exceed the maxRetries and write will fail.
+ Assert.fail("Max retries exceeded");
+ }
+ }
+
+ final OzoneKeyDetails key = bucket.getKey(keyName);
+ Assert.assertEquals(numExpectedBlockGrps,
+ key.getOzoneKeyLocations().size());
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[chunkSize];
+ for (int i = 0; i < dataBlocks * numStripesTotal; i++) {
+ Assert.assertEquals(inputChunks[i % dataBlocks].length,
+ is.read(fileContent));
+ Assert.assertArrayEquals(
+ "Expected: " + new String(inputChunks[i % dataBlocks], UTF_8)
+ + " \n " + "Actual: " + new String(fileContent, UTF_8),
+ inputChunks[i % dataBlocks], fileContent);
+ }
+ }
+ }
+
private OzoneBucket writeIntoECKey(byte[] data, String key,
DefaultReplicationConfig defaultReplicationConfig) throws IOException {
return writeIntoECKey(new byte[][] {data}, key, defaultReplicationConfig);
@@ -936,4 +1044,23 @@ public class TestOzoneECClient {
}
return bucket;
}
+
+ private List<OmKeyLocationInfo> getAllLocationInfoList(
+ ECKeyOutputStream kos) {
+ List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+ for (BlockOutputStreamEntry streamEntry : kos.getStreamEntries()) {
+ BlockStreamAccessor streamAccessor =
+ new BlockStreamAccessor(streamEntry);
+ OmKeyLocationInfo info =
+ new OmKeyLocationInfo.Builder()
+ .setBlockID(streamAccessor.getStreamBlockID())
+ .setLength(streamAccessor.getStreamCurrentPosition())
+ .setOffset(0)
+ .setToken(streamAccessor.getStreamToken())
+ .setPipeline(streamAccessor.getStreamPipeline())
+ .build();
+ locationInfoList.add(info);
+ }
+ return locationInfoList;
+ }
}
\ No newline at end of file
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/BlockStreamAccessor.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/BlockStreamAccessor.java
new file mode 100644
index 0000000..d9a7211
--- /dev/null
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/BlockStreamAccessor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * An accessor helper class to BlockOutputStreamEntry for test.
+ */
+public class BlockStreamAccessor {
+
+ private final BlockOutputStreamEntry streamEntry;
+
+ public BlockStreamAccessor(BlockOutputStreamEntry entry) {
+ this.streamEntry = entry;
+ }
+
+ public BlockID getStreamBlockID() {
+ return streamEntry.getBlockID();
+ }
+
+ public Pipeline getStreamPipeline() {
+ return streamEntry.getPipeline();
+ }
+
+ public Token<OzoneBlockTokenIdentifier> getStreamToken() {
+ return streamEntry.getToken();
+ }
+
+ public long getStreamCurrentPosition() {
+ return streamEntry.getCurrentPosition();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org