You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2021/12/03 22:51:15 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-5955: EC: Track the failed servers to add into the excludeList when invoking allocateBlock (#2849)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 1b9724d HDDS-5955: EC: Track the failed servers to add into the excludeList when invoking allocateBlock (#2849)
1b9724d is described below
commit 1b9724d60a7fcb485e3b837d028b50536e86c061
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Fri Dec 3 14:50:45 2021 -0800
HDDS-5955: EC: Track the failed servers to add into the excludeList when invoking allocateBlock (#2849)
Co-authored-by: Uma Maheswara Rao G <um...@cloudera.com>
---
.../hdds/scm/storage/ECBlockOutputStream.java | 30 +++++++++
.../SCMContainerPlacementRackScatter.java | 4 ++
.../ozone/client/io/ECBlockOutputStreamEntry.java | 34 ++++++----
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 71 ++++++++++++--------
.../hadoop/ozone/client/io/KeyOutputStream.java | 2 +-
.../ozone/client/rpc/TestECKeyOutputStream.java | 78 +++++++++++++++++++---
6 files changed, 169 insertions(+), 50 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 5ee34d3..66e550f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -41,6 +42,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
*/
public class ECBlockOutputStream extends BlockOutputStream{
+ private final DatanodeDetails datanodeDetails;
private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
currentChunkRspFuture = null;
@@ -64,6 +66,8 @@ public class ECBlockOutputStream extends BlockOutputStream{
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token);
+ // In EC stream, there will be only one node in pipeline.
+ this.datanodeDetails = pipeline.getClosestNode();
}
@Override
@@ -144,4 +148,30 @@ public class ECBlockOutputStream extends BlockOutputStream{
getCurrentPutBlkResponseFuture() {
return this.putBlkRspFuture;
}
+
+ /**
+ * Gets the target data node used in the current stream.
+ * @return DatanodeDetails
+ */
+ public DatanodeDetails getDatanodeDetails() {
+ return datanodeDetails;
+ }
+
+ @Override
+ void validateResponse(
+ ContainerProtos.ContainerCommandResponseProto responseProto)
+ throws IOException {
+ try {
+ // if the ioException is already set, it means a prev request has failed
+ // just throw the exception. The current operation will fail with the
+ // original error
+ IOException exception = getIoException();
+ if (exception != null) {
+ return;
+ }
+ ContainerProtocolCalls.validateContainerResponse(responseProto);
+ } catch (IOException sce) {
+ setIoException(sce);
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index f3b2c0c..54e7238 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -179,6 +179,10 @@ public final class SCMContainerPlacementRackScatter
}
for (Node rack : toChooseRacks) {
+ if (rack == null) {
+ // TODO: need to recheck why null coming here.
+ continue;
+ }
Node node = chooseNode(rack.getNetworkFullPath(), unavailableNodes,
metadataSizeRequired, dataSizeRequired);
if (node != null) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 95609b6..0cb7b0f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.security.token.Token;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -64,6 +65,7 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
private ECBlockOutputStream[] blockOutputStreams;
private int currentStreamIdx = 0;
+ private long successfulBlkGrpAckedLen;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
@@ -201,13 +203,15 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
return 0;
}
updateBlockID(underlyingBlockID());
- // Returning zero here. Underlying streams in EC entry are
- // ECBlockOutputStreams, extending from BlockOutputStream, without
- // overriding getTotalAckDataLength, and default implementation returns
- // constant zero, so even summarizing the return value of this method
- // from blockStreams entries would yield to 0. Once this changes, we need
- // to revisit this, and implement a proper sum of data or all streams.
- return 0;
+
+ return this.successfulBlkGrpAckedLen;
+ }
+
+ void updateBlockGroupToAckedPosition(long len) {
+ if (isWritingParity()){
+ return;
+ }
+ this.successfulBlkGrpAckedLen += len;
}
/**
@@ -216,6 +220,7 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
* In EC entries the parity writes does not count into this, as the written
* data length represents the attempts of the classes using the entry, and
* not the attempts of the entry itself.
+ *
* @return 0 if the stream is not initialized, the amount of data bytes that
* were attempted to be written to the entry.
*/
@@ -267,7 +272,11 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
if (stream == null) {
continue;
}
- stream.executePutBlock(false, true);
+ try {
+ stream.executePutBlock(false, true);
+ } catch (Exception e) {
+ stream.setIoException(e);
+ }
}
}
@@ -291,8 +300,9 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
* futures if false.
* @return
*/
- public boolean checkStreamFailures(boolean forPutBlock) {
+ public List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
final Iterator<ECBlockOutputStream> iter = blockStreams().iterator();
+ List<ECBlockOutputStream> failedStreams = new ArrayList<>();
while (iter.hasNext()) {
final ECBlockOutputStream stream = iter.next();
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
@@ -305,10 +315,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
stream != null ? stream.getCurrentChunkResponseFuture() : null;
}
if (isFailed(stream, responseFuture)) {
- return true;
+ failedStreams.add(stream);
}
}
- return false;
+ return failedStreams;
}
private boolean isFailed(
@@ -397,13 +407,11 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
return this;
}
-
public ECBlockOutputStreamEntry.Builder setLength(long len) {
this.length = len;
return this;
}
-
public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
this.bufferPool = pool;
return this;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 24b79c7..f5480c7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -57,7 +58,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
private final int numParityBlks;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private final RawErasureEncoder encoder;
- private final ECReplicationConfig.EcCodec ecCodec;
private enum StripeWriteStatus {
SUCCESS,
@@ -100,7 +100,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
this.config = config;
// For EC, cell/chunk size and buffer size can be same for now.
ecChunkSize = replicationConfig.getEcChunkSize();
- this.ecCodec = replicationConfig.getCodec();
this.config.setStreamBufferMaxSize(ecChunkSize);
this.config.setStreamBufferFlushSize(ecChunkSize);
this.config.setStreamBufferSize(ecChunkSize);
@@ -218,7 +217,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
// Rollback the length/offset updated as part of this failed stripe write.
offset -= failedStripeDataSize;
blockOutputStreamEntryPool.getCurrentStreamEntry()
- .incCurrentPosition(-failedStripeDataSize);
+ .resetToAckedPosition();
// Let's close the current entry.
blockOutputStreamEntryPool.getCurrentStreamEntry().close();
@@ -256,14 +255,20 @@ public class ECKeyOutputStream extends KeyOutputStream {
boolean allocateBlockIfFull) throws IOException {
//check data blocks finished
//If index is > datanum blks
- int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
- .getCurrentStreamIdx();
+ ECBlockOutputStreamEntry currentStreamEntry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ int currentStreamIdx = currentStreamEntry.getCurrentStreamIdx();
if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
//Lets encode and write
if (handleParityWrites(ecChunkSize,
allocateBlockIfFull) == StripeWriteStatus.FAILED) {
+ // TODO: This should make sure to retry until it's success. (HDDS-6036)
handleStripeFailure(ecChunkSize, numDataBlks * ecChunkSize);
}
+ // At this stage stripe write is successful.
+ currentStreamEntry.updateBlockGroupToAckedPosition(
+ currentStreamEntry.getCurrentPosition());
+
}
}
@@ -276,13 +281,19 @@ public class ECKeyOutputStream extends KeyOutputStream {
// TODO: we should alter the put block calls to share CRC to each stream.
ECBlockOutputStreamEntry streamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
+ List<ECBlockOutputStream> failedStreams =
+ streamEntry.getFailedStreams(false);
// Since writes are async, let's check the failures once.
- if(streamEntry.checkStreamFailures(false)){
+ if (failedStreams.size() > 0) {
+ addToExcludeNodesList(failedStreams);
return StripeWriteStatus.FAILED;
}
streamEntry.executePutBlock();
+
+ failedStreams = streamEntry.getFailedStreams(true);
// Since putBlock also async, let's check the failures again.
- if(streamEntry.checkStreamFailures(true)){
+ if (failedStreams.size() > 0) {
+ addToExcludeNodesList(failedStreams);
return StripeWriteStatus.FAILED;
}
ecChunkBufferCache.clear(parityCellSize);
@@ -299,6 +310,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
return StripeWriteStatus.SUCCESS;
}
+ private void addToExcludeNodesList(List<ECBlockOutputStream> failedStreams) {
+ for (ECBlockOutputStream failedStream : failedStreams) {
+ blockOutputStreamEntryPool.getExcludeList()
+ .addDatanode(failedStream.getDatanodeDetails());
+ }
+ }
+
void writeParityCells(int parityCellSize) throws IOException {
final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
ecChunkBufferCache.allocateParityBuffers(parityCellSize);
@@ -351,8 +369,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
try {
// Since it's a fullcell, let's write all content from buffer.
writeToOutputStream(current, len, bytesToWrite.array(),
- bytesToWrite.array().length, 0, current.getWrittenDataLength(),
- isParity);
+ bytesToWrite.array().length, 0, isParity);
} catch (Exception e) {
markStreamAsFailed(e);
}
@@ -360,29 +377,18 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
private int writeToOutputStream(BlockOutputStreamEntry current, long len,
- byte[] b, int writeLen, int off, long currentPos, boolean isParity)
+ byte[] b, int writeLen, int off, boolean isParity)
throws IOException {
try {
- current.write(b, off, writeLen);
if (!isParity) {
+ // In case if exception while writing, this length will be updated back
+ // as part of handleStripeFailure.
offset += writeLen;
}
+ current.write(b, off, writeLen);
} catch (IOException ioe) {
- // for the current iteration, totalDataWritten - currentPos gives the
- // amount of data already written to the buffer
-
- // In the retryPath, the total data to be written will always be equal
- // to or less than the max length of the buffer allocated.
- // The len specified here is the combined sum of the data length of
- // the buffers
- Preconditions.checkState(len <= config.getStreamBufferMaxSize());
- int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
- writeLen = dataWritten;
-
- if (!isParity) {
- offset += writeLen;
- }
- LOG.debug("writeLen {}, total len {}", writeLen, len);
+ LOG.debug("Exception:: writeLen: " + writeLen + ", total len:" + len,
+ ioe);
handleException(current, ioe);
}
return writeLen;
@@ -392,6 +398,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
IOException exception) throws IOException {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
+ boolean containerExclusionException = checkIfContainerToExclude(t);
+ if (containerExclusionException) {
+ blockOutputStreamEntryPool.getExcludeList()
+ .addPipeline(streamEntry.getPipeline().getId());
+ }
// In EC, we will just close the current stream.
markStreamAsFailed(exception);
}
@@ -458,8 +469,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
try {
byte[] array = bytesToWrite.array();
writeToOutputStream(current, bytesToWrite.position(), array,
- bytesToWrite.position(), 0, current.getWrittenDataLength(),
- false);
+ bytesToWrite.position(), 0, false);
} catch (Exception e) {
markStreamAsFailed(e);
}
@@ -475,6 +485,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
// TODO: loop this until we succeed?
handleStripeFailure(parityCellSize, lastStripeSize);
}
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .updateBlockGroupToAckedPosition(
+ blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .getCurrentPosition());
+
}
closeCurrentStreamEntry();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 4e0fbf9..f7b323c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -431,7 +431,7 @@ public class KeyOutputStream extends OutputStream {
// Every container specific exception from datatnode will be seen as
// StorageContainerException
- private boolean checkIfContainerToExclude(Throwable t) {
+ boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index e8b3d11..3549532 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -88,12 +89,21 @@ public class TestECKeyOutputStream {
conf.setFromObject(clientConfig);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ // If SCM detects dead node too quickly, then container would be moved to
+ // closed state and all in progress writes will get exception. To avoid
+ // that, we are just keeping higher timeout and none of the tests depending
+ // on deadnode detection timeout currently.
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 300, TimeUnit.SECONDS);
+ conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
+ TimeUnit.SECONDS);
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
.setTotalPipelineNumLimit(10).setBlockSize(blockSize)
.setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
@@ -205,12 +215,7 @@ public class TestECKeyOutputStream {
public void testMultipleChunksInSingleWriteOp(int numChunks)
throws IOException {
- byte[] inputData = new byte[numChunks * chunkSize];
- for (int i = 0; i < numChunks; i++) {
- int start = (i * chunkSize);
- Arrays.fill(inputData, start, start + chunkSize - 1,
- String.valueOf(i % 9).getBytes(UTF_8)[0]);
- }
+ byte[] inputData = getInputBytes(numChunks);
final OzoneBucket bucket = getOzoneBucket();
String keyName = "testMultipleChunksInSingleWriteOp" + numChunks;
try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
@@ -258,4 +263,61 @@ public class TestECKeyOutputStream {
}
return builder.toString().getBytes(UTF_8);
}
+
+ @Test
+ public void testWriteShouldSucceedWhenDNKilled() throws Exception {
+ int numChunks = 3;
+ byte[] inputData = getInputBytes(numChunks);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
+ try {
+ try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ out.write(inputData);
+ // Kill a node from first pipeline
+ DatanodeDetails nodeToKill =
+ ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
+ .get(0).getPipeline().getFirstNode();
+ cluster.shutdownHddsDatanode(nodeToKill);
+
+ out.write(inputData);
+ // Check the second blockGroup pipeline to make sure that the failed not
+ // is not selected.
+ Assert.assertFalse(
+ ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
+ .get(1).getPipeline().getNodes().contains(nodeToKill));
+ }
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ // TODO: this skip can be removed once read handles online recovery.
+ long skip = is.skip(inputData.length);
+ Assert.assertTrue(skip == inputData.length);
+ // All nodes available in second block group. So, lets assert.
+ byte[] fileContent = new byte[inputData.length];
+ Assert.assertEquals(inputData.length, is.read(fileContent));
+ Assert.assertEquals(new String(inputData, UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ } finally {
+ // TODO: optimize to just start the killed DN back.
+ resetCluster();
+ }
+ }
+
+ private void resetCluster() throws Exception {
+ cluster.shutdown();
+ init();
+ }
+
+ private byte[] getInputBytes(int numChunks) {
+ byte[] inputData = new byte[numChunks * chunkSize];
+ for (int i = 0; i < numChunks; i++) {
+ int start = (i * chunkSize);
+ Arrays.fill(inputData, start, start + chunkSize - 1,
+ String.valueOf(i % 9).getBytes(UTF_8)[0]);
+ }
+ return inputData;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org