You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/06/16 16:15:43 UTC
[ozone] branch master updated: HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef1df7b47 HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
6ef1df7b47 is described below
commit 6ef1df7b4790a99986b4ee83c43286055701d10f
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Thu Jun 16 09:15:39 2022 -0700
HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
---
.../hadoop/hdds/scm/XceiverClientManager.java | 33 ++
.../hdds/scm/storage/ECBlockOutputStream.java | 5 +
.../hdds/scm/storage/ContainerProtocolCalls.java | 19 +-
.../common/statemachine/DatanodeStateMachine.java | 10 +-
.../ReconstructECContainersCommandHandler.java | 5 +-
.../reconstruction/ECContainerOperationClient.java | 148 ++++++++
.../ECReconstructionCoordinator.java | 410 +++++++++++++++++++++
.../ECReconstructionCoordinatorTask.java | 39 ++
.../reconstruction/ECReconstructionSupervisor.java | 27 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 1 +
.../TestECReconstructionSupervisor.java | 4 +-
.../hdds/scm/storage/TestContainerCommandsEC.java | 288 +++++++++++++--
12 files changed, 938 insertions(+), 51 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index ee4edecb3c..0724d614cc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -66,6 +66,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class
private final ConfigurationSource conf;
+ private final ScmClientConfig clientConfig;
private final Cache<String, XceiverClientSpi> clientCache;
private List<X509Certificate> caCerts;
@@ -88,6 +89,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
List<X509Certificate> caCerts) throws IOException {
Preconditions.checkNotNull(clientConf);
Preconditions.checkNotNull(conf);
+ this.clientConfig = clientConf;
long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
this.conf = conf;
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
@@ -347,6 +349,37 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
this.maxSize = maxSize;
}
+ public void setStaleThreshold(long threshold) {
+ this.staleThreshold = threshold;
+ }
+
+ }
+
+ /**
+ * Builder of ScmClientConfig.
+ */
+ public static class XceiverClientManagerConfigBuilder {
+
+ private int maxCacheSize;
+ private long staleThresholdMs;
+
+ public XceiverClientManagerConfigBuilder setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ return this;
+ }
+
+ public XceiverClientManagerConfigBuilder setStaleThresholdMs(
+ long staleThresholdMs) {
+ this.staleThresholdMs = staleThresholdMs;
+ return this;
+ }
+
+ public ScmClientConfig build() {
+ ScmClientConfig clientConfig = new ScmClientConfig();
+ clientConfig.setMaxSize(this.maxCacheSize);
+ clientConfig.setStaleThreshold(this.staleThresholdMs);
+ return clientConfig;
+ }
}
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 3f3fc12f2a..174e507829 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -80,6 +80,11 @@ public class ECBlockOutputStream extends BlockOutputStream {
writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
}
+ public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
+ ByteBuffer buff) throws IOException {
+ return writeChunkToContainer(ChunkBuffer.wrap(buff));
+ }
+
public CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force, long blockGroupLength) throws IOException {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index b5365820e3..16bef69712 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -425,13 +425,15 @@ public final class ContainerProtocolCalls {
* @param client - client
* @param containerID - ID of container
* @param encodedToken - encodedToken if security is enabled
+ * @param replicaIndex - index position of the container replica
* @throws IOException
*/
@InterfaceStability.Evolving
public static void createRecoveringContainer(XceiverClientSpi client,
- long containerID, String encodedToken) throws IOException {
+ long containerID, String encodedToken, int replicaIndex)
+ throws IOException {
createContainerInternal(client, containerID, encodedToken,
- ContainerProtos.ContainerDataProto.State.RECOVERING);
+ ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
}
/**
@@ -443,7 +445,7 @@ public final class ContainerProtocolCalls {
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String encodedToken) throws IOException {
- createContainerInternal(client, containerID, encodedToken, null);
+ createContainerInternal(client, containerID, encodedToken, null, 0);
}
/**
* createContainer call that creates a container on the datanode.
@@ -451,18 +453,23 @@ public final class ContainerProtocolCalls {
* @param containerID - ID of container
* @param encodedToken - encodedToken if security is enabled
* @param state - state of the container
+ * @param replicaIndex - index position of the container replica
* @throws IOException
*/
private static void createContainerInternal(XceiverClientSpi client,
long containerID, String encodedToken,
- ContainerProtos.ContainerDataProto.State state) throws IOException {
+ ContainerProtos.ContainerDataProto.State state, int replicaIndex)
+ throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto.newBuilder();
- createRequest.setContainerType(ContainerProtos.ContainerType
- .KeyValueContainer);
+ createRequest
+ .setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
if (state != null) {
createRequest.setState(state);
}
+ if (replicaIndex > 0) {
+ createRequest.setReplicaIndex(replicaIndex);
+ }
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ba21e5795e..cc05511b58 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Reco
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -181,9 +182,12 @@ public class DatanodeStateMachine implements Closeable {
replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
+ ECReconstructionCoordinator ecReconstructionCoordinator =
+ new ECReconstructionCoordinator(conf, certClient);
ecReconstructionSupervisor =
new ECReconstructionSupervisor(container.getContainerSet(), context,
- replicationConfig.getReplicationMaxStreams());
+ replicationConfig.getReplicationMaxStreams(),
+ ecReconstructionCoordinator);
// When we add new handlers just adding a new handler here should do the
@@ -389,6 +393,10 @@ public class DatanodeStateMachine implements Closeable {
Thread.currentThread().interrupt();
}
+ if (ecReconstructionSupervisor != null) {
+ ecReconstructionSupervisor.close();
+ }
+
if (connectionManager != null) {
connectionManager.close();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index f4ec45f600..009d47d7af 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -53,8 +53,9 @@ public class ReconstructECContainersCommandHandler implements CommandHandler {
ecContainersCommand.getMissingContainerIndexes(),
ecContainersCommand.getSources(),
ecContainersCommand.getTargetDatanodes());
- this.supervisor.addTask(
- new ECReconstructionCoordinatorTask(reconstructionCommandInfo));
+ this.supervisor.addTask(new ECReconstructionCoordinatorTask(
+ this.supervisor.getReconstructionCoordinator(),
+ reconstructionCommandInfo));
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
new file mode 100644
index 0000000000..adebe11776
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ec.reconstruction;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class wraps necessary container-level rpc calls
+ * during ec offline reconstruction.
+ * - ListBlock
+ * - CloseContainer
+ */
+public class ECContainerOperationClient implements Closeable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECContainerOperationClient.class);
+ private final XceiverClientManager xceiverClientManager;
+
+ public ECContainerOperationClient(XceiverClientManager clientManager) {
+ this.xceiverClientManager = clientManager;
+ }
+
+ public ECContainerOperationClient(ConfigurationSource conf,
+ CertificateClient certificateClient) throws IOException {
+ this(createClientManager(conf, certificateClient));
+ }
+
+ @NotNull
+ private static XceiverClientManager createClientManager(
+ ConfigurationSource conf, CertificateClient certificateClient)
+ throws IOException {
+ return new XceiverClientManager(conf,
+ new XceiverClientManager.XceiverClientManagerConfigBuilder()
+ .setMaxCacheSize(256).setStaleThresholdMs(10 * 1000).build(),
+ certificateClient != null ?
+ HAUtils.buildCAX509List(certificateClient, conf) :
+ null);
+ }
+
+ public BlockData[] listBlock(long containerId, DatanodeDetails dn,
+ ECReplicationConfig repConfig, Token<? extends TokenIdentifier> token)
+ throws IOException {
+ XceiverClientSpi xceiverClient = this.xceiverClientManager
+ .acquireClient(singleNodePipeline(dn, repConfig));
+ try {
+ List<ContainerProtos.BlockData> blockDataList = ContainerProtocolCalls
+ .listBlock(xceiverClient, containerId, null, Integer.MAX_VALUE, token)
+ .getBlockDataList();
+ return blockDataList.stream().map(i -> {
+ try {
+ return BlockData.getFromProtoBuf(i);
+ } catch (IOException e) {
+ LOG.debug("Failed while converting to protobuf BlockData. Returning"
+ + " null for listBlock from DN: " + dn,
+ e);
+ // TODO: revisit here.
+ return null;
+ }
+ }).collect(Collectors.toList())
+ .toArray(new BlockData[blockDataList.size()]);
+ } finally {
+ this.xceiverClientManager.releaseClient(xceiverClient, false);
+ }
+ }
+
+ public void closeContainer(long containerID, DatanodeDetails dn,
+ ECReplicationConfig repConfig, String encodedToken) throws IOException {
+ XceiverClientSpi xceiverClient = this.xceiverClientManager
+ .acquireClient(singleNodePipeline(dn, repConfig));
+ try {
+ ContainerProtocolCalls
+ .closeContainer(xceiverClient, containerID, encodedToken);
+ } finally {
+ this.xceiverClientManager.releaseClient(xceiverClient, false);
+ }
+ }
+
+ public void createRecoveringContainer(long containerID, DatanodeDetails dn,
+ ECReplicationConfig repConfig, String encodedToken, int replicaIndex)
+ throws IOException {
+ XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(
+ singleNodePipeline(dn, repConfig));
+ try {
+ ContainerProtocolCalls
+ .createRecoveringContainer(xceiverClient, containerID, encodedToken,
+ replicaIndex);
+ } finally {
+ this.xceiverClientManager.releaseClient(xceiverClient, false);
+ }
+ }
+
+ Pipeline singleNodePipeline(DatanodeDetails dn,
+ ECReplicationConfig repConfig) {
+ // To get the same client from cache, we try to use the DN UUID as
+ // pipelineID for uniqueness. Please note, pipeline does not have any
+ // significance after it's close. So, we are ok to use any ID.
+ return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
+ .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
+ .setState(Pipeline.PipelineState.CLOSED).build();
+ }
+
+ public XceiverClientManager getXceiverClientManager() {
+ return xceiverClientManager;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (xceiverClientManager != null) {
+ xceiverClientManager.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
new file mode 100644
index 0000000000..780c520d72
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ec.reconstruction;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * The Coordinator implements the main flow of reconstructing
+ * missing container replicas.
+ * <p>
+ * For a container reconstruction task, the main flow is:
+ * - ListBlock from all healthy replicas
+ * - calculate effective block group len for all blocks
+ * - create RECOVERING containers in TargetDNs
+ * - for each block
+ * - build a ECReconstructedStripedInputStream to read healthy chunks
+ * - build a ECBlockOutputStream to write out decoded chunks
+ * - for each stripe
+ * - use ECReconstructedStripedInputStream.recoverChunks to decode chunks
+ * - use ECBlockOutputStream.write to write decoded chunks to TargetDNs
+ * - PutBlock
+ * - Close RECOVERING containers in TargetDNs
+ */
+public class ECReconstructionCoordinator implements Closeable {
+
+ static final Logger LOG =
+ LoggerFactory.getLogger(ECReconstructionCoordinator.class);
+
+ private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+
+ private final ECContainerOperationClient containerOperationClient;
+
+ private final ConfigurationSource config;
+
+ private final ByteBufferPool byteBufferPool;
+
+ private ExecutorService ecReconstructExecutor;
+
+ private BlockInputStreamFactory blockInputStreamFactory;
+
+ public ECReconstructionCoordinator(ECContainerOperationClient containerClient,
+ ConfigurationSource conf, ByteBufferPool byteBufferPool,
+ ExecutorService reconstructExecutor,
+ BlockInputStreamFactory streamFactory) {
+ this.containerOperationClient = containerClient;
+ this.config = conf;
+ this.byteBufferPool = byteBufferPool;
+ this.blockInputStreamFactory = streamFactory;
+ this.ecReconstructExecutor = reconstructExecutor;
+ }
+
+ public ECReconstructionCoordinator(ConfigurationSource conf,
+ CertificateClient certificateClient) throws IOException {
+ this(new ECContainerOperationClient(conf, certificateClient), conf,
+ new ElasticByteBufferPool(), null, null);
+ this.ecReconstructExecutor =
+ new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+ config.getObject(OzoneClientConfig.class)
+ .getEcReconstructStripeReadPoolLimit(), 60, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new ThreadFactoryBuilder()
+ .setNameFormat("ec-reconstruct-reader-TID-%d").build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ this.blockInputStreamFactory = BlockInputStreamFactoryImpl
+ .getInstance(byteBufferPool, () -> ecReconstructExecutor);
+ }
+
+ public void reconstructECContainerGroup(long containerID,
+ ECReplicationConfig repConfig,
+ SortedMap<Integer, DatanodeDetails> sourceNodeMap,
+ SortedMap<Integer, DatanodeDetails> targetNodeMap) throws IOException {
+
+ Pipeline pipeline = rebuildInputPipeline(repConfig, sourceNodeMap);
+
+ SortedMap<Long, BlockData[]> blockDataMap =
+ getBlockDataMap(containerID, repConfig, sourceNodeMap);
+
+ SortedMap<Long, BlockLocationInfo> blockLocationInfoMap =
+ calcBlockLocationInfoMap(containerID, blockDataMap, pipeline);
+
+ // 1. create target recovering containers.
+ for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
+ .entrySet()) {
+ this.containerOperationClient
+ .createRecoveringContainer(containerID, indexDnPair.getValue(),
+ repConfig, null, indexDnPair.getKey());
+ }
+
+ // 2. Reconstruct and transfer to targets
+ for (BlockLocationInfo blockLocationInfo : blockLocationInfoMap.values()) {
+ reconstructECBlockGroup(blockLocationInfo, repConfig, targetNodeMap);
+ }
+
+ // 3. Close containers
+ for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
+ .entrySet()) {
+ DatanodeDetails dn = indexDnPair.getValue();
+ this.containerOperationClient
+ .closeContainer(containerID, dn, repConfig, null);
+ }
+
+ }
+
+ void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
+ ECReplicationConfig repConfig,
+ SortedMap<Integer, DatanodeDetails> targetMap)
+ throws IOException {
+ long safeBlockGroupLength = blockLocationInfo.getLength();
+ List<Integer> missingContainerIndexes = new ArrayList<>(targetMap.keySet());
+
+ // calculate the real missing block indexes
+ int dataLocs = ECBlockInputStreamProxy
+ .expectedDataLocations(repConfig, safeBlockGroupLength);
+ List<Integer> toReconstructIndexes = new ArrayList<>();
+ for (Integer index : missingContainerIndexes) {
+ if (index <= dataLocs || index > repConfig.getData()) {
+ toReconstructIndexes.add(index);
+ }
+ // else padded indexes.
+ }
+
+ // Looks like we don't need to reconstruct any missing blocks in this block
+ // group. The reason for this should be block group had only padding blocks
+ // in the missing locations.
+ if (toReconstructIndexes.size() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping the reconstruction for the block: "
+ + blockLocationInfo.getBlockID() + ". In the missing locations: "
+ + missingContainerIndexes
+ + ", this block group has only padded blocks.");
+ }
+ return;
+ }
+
+ try (ECBlockReconstructedStripeInputStream sis
+ = new ECBlockReconstructedStripeInputStream(
+ repConfig, blockLocationInfo, true,
+ this.containerOperationClient.getXceiverClientManager(), null,
+ this.blockInputStreamFactory, byteBufferPool,
+ this.ecReconstructExecutor)) {
+
+ ECBlockOutputStream[] targetBlockStreams =
+ new ECBlockOutputStream[toReconstructIndexes.size()];
+ ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
+ OzoneClientConfig configuration = new OzoneClientConfig();
+ // TODO: Let's avoid unnecessary bufferPool creation. This pool actually
+ // not used in EC flows, but there are some dependencies on buffer pool.
+ BufferPool bufferPool =
+ new BufferPool(configuration.getStreamBufferSize(),
+ (int) (configuration.getStreamBufferMaxSize() / configuration
+ .getStreamBufferSize()),
+ ByteStringConversion.createByteBufferConversion(false));
+ for (int i = 0; i < toReconstructIndexes.size(); i++) {
+ DatanodeDetails datanodeDetails =
+ targetMap.get(toReconstructIndexes.get(i));
+ targetBlockStreams[i] =
+ new ECBlockOutputStream(blockLocationInfo.getBlockID(),
+ this.containerOperationClient.getXceiverClientManager(),
+ this.containerOperationClient
+ .singleNodePipeline(datanodeDetails, repConfig), bufferPool,
+ configuration, blockLocationInfo.getToken());
+ bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
+ // Make sure it's clean. Don't want to reuse the erroneously returned
+ // buffers from the pool.
+ bufs[i].clear();
+ }
+
+ sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
+ .collect(Collectors.toSet()));
+ long length = safeBlockGroupLength;
+ while (length > 0) {
+ int readLen = sis.recoverChunks(bufs);
+ // TODO: can be submitted in parallel
+ for (int i = 0; i < bufs.length; i++) {
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ future = targetBlockStreams[i].write(bufs[i]);
+ checkFailures(targetBlockStreams[i], future);
+ bufs[i].clear();
+ }
+ length -= readLen;
+ }
+
+ try {
+ for (ECBlockOutputStream targetStream : targetBlockStreams) {
+ targetStream
+ .executePutBlock(true, true, blockLocationInfo.getLength());
+ checkFailures(targetStream,
+ targetStream.getCurrentPutBlkResponseFuture());
+ }
+ } finally {
+ for (ByteBuffer buf : bufs) {
+ byteBufferPool.putBuffer(buf);
+ }
+ IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
+ }
+ }
+ }
+
+ private void checkFailures(ECBlockOutputStream targetBlockStream,
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ currentPutBlkResponseFuture)
+ throws IOException {
+ if (isFailed(targetBlockStream, currentPutBlkResponseFuture)) {
+ // If one chunk response failed, we should retry.
+ // Even after retries if it failed, we should declare the
+ // reconstruction as failed.
+ // For now, let's throw the exception.
+ throw new IOException(
+ "Chunk write failed at the new target node: " + targetBlockStream
+ .getDatanodeDetails() + ". Aborting the reconstruction process.");
+ }
+ }
+
+ private boolean isFailed(ECBlockOutputStream outputStream,
+ CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto> chunkWriteResponseFuture) {
+ if (chunkWriteResponseFuture == null) {
+ return true;
+ }
+
+ ContainerProtos.ContainerCommandResponseProto
+ containerCommandResponseProto = null;
+ try {
+ containerCommandResponseProto = chunkWriteResponseFuture.get();
+ } catch (InterruptedException e) {
+ outputStream.setIoException(e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ outputStream.setIoException(e);
+ }
+
+ if (outputStream.getIoException() != null) {
+ return true;
+ }
+
+ if (containerCommandResponseProto == null) {
+ return true;
+ }
+
+ return false;
+ }
+
+ SortedMap<Long, BlockLocationInfo> calcBlockLocationInfoMap(long containerID,
+ SortedMap<Long, BlockData[]> blockDataMap, Pipeline pipeline) {
+
+ SortedMap<Long, BlockLocationInfo> blockInfoMap = new TreeMap<>();
+
+ for (Map.Entry<Long, BlockData[]> entry : blockDataMap.entrySet()) {
+ Long localID = entry.getKey();
+ BlockData[] blockGroup = entry.getValue();
+
+ long blockGroupLen = calcEffectiveBlockGroupLen(blockGroup,
+ pipeline.getReplicationConfig().getRequiredNodes());
+ if (blockGroupLen > 0) {
+ BlockLocationInfo blockLocationInfo = new BlockLocationInfo.Builder()
+ .setBlockID(new BlockID(containerID, localID))
+ .setLength(blockGroupLen).setPipeline(pipeline).build();
+ blockInfoMap.put(localID, blockLocationInfo);
+ }
+ }
+ return blockInfoMap;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (containerOperationClient != null) {
+ containerOperationClient.close();
+ }
+ }
+
+ private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
+ SortedMap<Integer, DatanodeDetails> sourceNodeMap) {
+
+ List<DatanodeDetails> nodes = new ArrayList<>(sourceNodeMap.values());
+ Map<DatanodeDetails, Integer> dnVsIndex = new HashMap<>();
+
+ Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+ sourceNodeMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+ Integer key = next.getKey();
+ DatanodeDetails value = next.getValue();
+ dnVsIndex.put(value, key);
+ }
+
+ return Pipeline.newBuilder().setId(PipelineID.randomId())
+ .setReplicationConfig(repConfig).setNodes(nodes)
+ .setReplicaIndexes(dnVsIndex).setState(Pipeline.PipelineState.CLOSED)
+ .build();
+ }
+
+ private SortedMap<Long, BlockData[]> getBlockDataMap(long containerID,
+ ECReplicationConfig repConfig,
+ Map<Integer, DatanodeDetails> sourceNodeMap) throws IOException {
+
+ SortedMap<Long, BlockData[]> resultMap = new TreeMap<>();
+
+ Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+ sourceNodeMap.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+ Integer index = next.getKey();
+ DatanodeDetails dn = next.getValue();
+
+ BlockData[] blockDataArr =
+ containerOperationClient.listBlock(containerID, dn, repConfig, null);
+
+ for (BlockData blockData : blockDataArr) {
+ BlockID blockID = blockData.getBlockID();
+ BlockData[] blkDataArr = resultMap.getOrDefault(blockData.getLocalID(),
+ new BlockData[repConfig.getRequiredNodes()]);
+ blkDataArr[index - 1] = blockData;
+ resultMap.put(blockID.getLocalID(), blkDataArr);
+ }
+ }
+ return resultMap;
+ }
+
+ /**
+ * Get the effective length of each block group.
+ * We can not be absolutely accurate when there is a failed stripe
+ * in this block since the failed cells could be missing, and
+ * we can not tell from the healthy cells whether the last stripe
+ * is failed or not. But in such case we at most recover one extra
+ * stripe for this block which does not confuse the client data view.
+ *
+ * @param blockGroup
+ * @param replicaCount
+ * @return
+ */
+ private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
+ int replicaCount) {
+ Preconditions.checkState(blockGroup.length == replicaCount);
+
+ long blockGroupLen = Long.MAX_VALUE;
+
+ for (int i = 0; i < replicaCount; i++) {
+ if (blockGroup[i] == null) {
+ continue;
+ }
+
+ String putBlockLenStr = blockGroup[i].getMetadata()
+ .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
+ long putBlockLen = (putBlockLenStr == null) ?
+ Long.MAX_VALUE :
+ Long.parseLong(putBlockLenStr);
+ // Use the min to be conservative
+ blockGroupLen = Math.min(putBlockLen, blockGroupLen);
+ }
+ return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
index 24168e5f69..af4a87e272 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
@@ -17,14 +17,30 @@
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* This is the actual EC reconstruction coordination task.
*/
public class ECReconstructionCoordinatorTask implements Runnable {
+ static final Logger LOG =
+ LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
+ private ECReconstructionCoordinator reconstructionCoordinator;
private ECReconstructionCommandInfo reconstructionCommandInfo;
public ECReconstructionCoordinatorTask(
+ ECReconstructionCoordinator coordinator,
ECReconstructionCommandInfo reconstructionCommandInfo) {
+ this.reconstructionCoordinator = coordinator;
this.reconstructionCommandInfo = reconstructionCommandInfo;
}
@@ -42,6 +58,29 @@ public class ECReconstructionCoordinatorTask implements Runnable {
// 4. Write the recovered chunks to given targets/write locally to
// respective container. HDDS-6582
// 5. Close/finalize the recovered containers.
+
+ SortedMap<Integer, DatanodeDetails> sourceNodeMap =
+ reconstructionCommandInfo.getSources().stream().collect(Collectors
+ .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
+ DatanodeDetailsAndReplicaIndex::getDnDetails, (v1, v2) -> v1,
+ TreeMap::new));
+ SortedMap<Integer, DatanodeDetails> targetNodeMap = IntStream
+ .range(0, reconstructionCommandInfo.getTargetDatanodes().size()).boxed()
+ .collect(Collectors.toMap(i -> (int) reconstructionCommandInfo
+ .getMissingContainerIndexes()[i],
+ i -> reconstructionCommandInfo.getTargetDatanodes().get(i),
+ (v1, v2) -> v1, TreeMap::new));
+
+ try {
+ reconstructionCoordinator.reconstructECContainerGroup(
+ reconstructionCommandInfo.getContainerID(),
+ reconstructionCommandInfo.getEcReplicationConfig(), sourceNodeMap,
+ targetNodeMap);
+ } catch (IOException e) {
+ LOG.warn(
+ "Failed to complete the reconstruction task for the container: "
+ + reconstructionCommandInfo.getContainerID(), e);
+ }
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
index e2c930a8c2..2af04f6500 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
@@ -21,6 +21,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -30,28 +32,33 @@ import java.util.concurrent.TimeUnit;
* This class is to handle all the EC reconstruction tasks to be scheduled as
* they arrive.
*/
-public class ECReconstructionSupervisor {
+public class ECReconstructionSupervisor implements Closeable {
private final ContainerSet containerSet;
private final StateContext context;
private final ExecutorService executor;
+ private final ECReconstructionCoordinator reconstructionCoordinator;
public ECReconstructionSupervisor(ContainerSet containerSet,
- StateContext context, ExecutorService executor) {
+ StateContext context, ExecutorService executor,
+ ECReconstructionCoordinator coordinator) {
this.containerSet = containerSet;
this.context = context;
this.executor = executor;
+ this.reconstructionCoordinator = coordinator;
}
public ECReconstructionSupervisor(ContainerSet containerSet,
- StateContext context, int poolSize) {
+ StateContext context, int poolSize,
+ ECReconstructionCoordinator coordinator) {
// TODO: ReplicationSupervisor and this class can be refactored to have a
// common interface.
this(containerSet, context,
new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ECContainerReconstructionThread-%d").build()));
+ .setNameFormat("ECContainerReconstructionThread-%d").build()),
+ coordinator);
}
public void stop() {
@@ -69,4 +76,16 @@ public class ECReconstructionSupervisor {
public void addTask(ECReconstructionCoordinatorTask task) {
executor.execute(task);
}
+
+ @Override
+ public void close() throws IOException {
+ if (reconstructionCoordinator != null) {
+ reconstructionCoordinator.close();
+ }
+ stop();
+ }
+
+ public ECReconstructionCoordinator getReconstructionCoordinator() {
+ return reconstructionCoordinator;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 7fcbdb3e7f..9590711650 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -283,6 +283,7 @@ public class KeyValueHandler extends Handler {
}
newContainerData.setReplicaIndex(request.getCreateContainer()
.getReplicaIndex());
+
// TODO: Add support to add metadataList to ContainerData. Add metadata
// to container during creation.
KeyValueContainer newContainer = new KeyValueContainer(
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
index e86be82a09..2403364f4c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeoutException;
public class TestECReconstructionSupervisor {
private final ECReconstructionSupervisor supervisor =
- new ECReconstructionSupervisor(null, null, 5);
+ new ECReconstructionSupervisor(null, null, 5, null);
@Test
public void testAddTaskShouldExecuteTheGivenTask()
@@ -42,7 +42,7 @@ public class TestECReconstructionSupervisor {
private boolean isExecuted = false;
FakeTask(ECReconstructionCommandInfo reconstructionCommandInfo) {
- super(reconstructionCommandInfo);
+ super(null, reconstructionCommandInfo);
}
@Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 76cfb7b289..3047d18d7e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -47,10 +48,16 @@ import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -58,6 +65,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.io.OutputStream;
@@ -65,10 +74,16 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -79,7 +94,7 @@ public class TestContainerCommandsEC {
private static MiniOzoneCluster cluster;
private static StorageContainerManager scm;
- private static OzoneClient client;
+ private static OzoneClient rpcClient;
private static ObjectStore store;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@@ -90,17 +105,16 @@ public class TestContainerCommandsEC {
private static final EcCodec EC_CODEC = EcCodec.RS;
private static final int EC_CHUNK_SIZE = 1024;
private static final int STRIPE_DATA_SIZE = EC_DATA * EC_CHUNK_SIZE;
- private static final int NUM_DN = EC_DATA + EC_PARITY;
+ private static final int NUM_DN = EC_DATA + EC_PARITY + 3;
+ private static byte[][] inputChunks = new byte[EC_DATA][EC_CHUNK_SIZE];
// Each key size will be in range [min, max), min inclusive, max exclusive
- private static final int[][] KEY_SIZE_RANGES = new int[][]{
- {1, EC_CHUNK_SIZE},
- {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1},
- {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE},
- {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1},
- {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE},
- {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2},
- };
+ private static final int[][] KEY_SIZE_RANGES =
+ new int[][] {{1, EC_CHUNK_SIZE}, {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1},
+ {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE},
+ {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1},
+ {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE},
+ {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2}};
private static byte[][] values;
private static long containerID;
private static Pipeline pipeline;
@@ -117,6 +131,7 @@ public class TestContainerCommandsEC {
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
startCluster(config);
prepareData(KEY_SIZE_RANGES);
+ rpcClient = OzoneClientFactory.getRpcClient(config);
}
@AfterAll
@@ -130,13 +145,11 @@ public class TestContainerCommandsEC {
Map<DatanodeDetails, Integer> indicesForSinglePipeline = new HashMap<>();
indicesForSinglePipeline.put(node, replicaIndex);
- return Pipeline.newBuilder()
- .setId(ecPipeline.getId())
+ return Pipeline.newBuilder().setId(ecPipeline.getId())
.setReplicationConfig(ecPipeline.getReplicationConfig())
.setState(ecPipeline.getPipelineState())
.setNodes(ImmutableList.of(node))
- .setReplicaIndexes(indicesForSinglePipeline)
- .build();
+ .setReplicaIndexes(indicesForSinglePipeline).build();
}
@BeforeEach
@@ -175,22 +188,27 @@ public class TestContainerCommandsEC {
public void testListBlock() throws Exception {
for (int i = 0; i < datanodeDetails.size(); i++) {
final int minKeySize = i < EC_DATA ? i * EC_CHUNK_SIZE : 0;
- final int numExpectedBlocks = (int) Arrays.stream(values)
- .mapToInt(v -> v.length).filter(s -> s > minKeySize).count();
+ final int numExpectedBlocks =
+ (int) Arrays.stream(values).mapToInt(v -> v.length)
+ .filter(s -> s > minKeySize).count();
Function<Integer, Integer> expectedChunksFunc = chunksInReplicaFunc(i);
- final int numExpectedChunks = Arrays.stream(values)
- .mapToInt(v -> v.length).map(expectedChunksFunc::apply).sum();
+ final int numExpectedChunks =
+ Arrays.stream(values).mapToInt(v -> v.length)
+ .map(expectedChunksFunc::apply).sum();
if (numExpectedBlocks == 0) {
final int j = i;
Throwable t = Assertions.assertThrows(StorageContainerException.class,
- () -> ContainerProtocolCalls.listBlock(clients.get(j),
- containerID, null, numExpectedBlocks + 1, null));
- Assertions.assertEquals(
- "ContainerID " + containerID + " does not exist", t.getMessage());
+ () -> ContainerProtocolCalls
+ .listBlock(clients.get(j), containerID, null,
+ numExpectedBlocks + 1, null));
+ Assertions
+ .assertEquals("ContainerID " + containerID + " does not exist",
+ t.getMessage());
continue;
}
- ListBlockResponseProto response = ContainerProtocolCalls.listBlock(
- clients.get(i), containerID, null, numExpectedBlocks + 1, null);
+ ListBlockResponseProto response = ContainerProtocolCalls
+ .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1,
+ null);
Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(),
"blocks count doesn't match on DN " + i);
Assertions.assertEquals(numExpectedChunks,
@@ -228,7 +246,7 @@ public class TestContainerCommandsEC {
//Create the recovering container in DN.
ContainerProtocolCalls.createRecoveringContainer(dnClient,
- container.containerID().getProtobuf().getId(), null);
+ container.containerID().getProtobuf().getId(), null, 4);
BlockID blockID = ContainerTestHelper
.getTestBlockID(container.containerID().getProtobuf().getId());
@@ -278,6 +296,207 @@ public class TestContainerCommandsEC {
}
}
+ private static byte[] getBytesWith(int singleDigitNumber, int total) {
+ StringBuilder builder = new StringBuilder(singleDigitNumber);
+ for (int i = 1; i <= total; i++) {
+ builder.append(singleDigitNumber);
+ }
+ return builder.toString().getBytes(UTF_8);
+ }
+
+ @ParameterizedTest
+ @MethodSource("recoverableMissingIndexes")
+ void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
+ throws Exception {
+ testECReconstructionCoordinator(missingIndexes);
+ }
+
+ static Stream<List<Integer>> recoverableMissingIndexes() {
+ return Stream
+ .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
+ .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
+ ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
+ }
+
+ /**
+ * Tests the reconstruction of data when more than parity blocks missed.
+ * Test should throw InsufficientLocationsException.
+ */
+ @Test
+ public void testECReconstructionCoordinatorWithMissingIndexes135() {
+ InsufficientLocationsException exception =
+ Assert.assertThrows(InsufficientLocationsException.class, () -> {
+ testECReconstructionCoordinator(ImmutableList.of(1, 3, 5));
+ });
+
+ String expectedMessage =
+ "There are insufficient datanodes to read the EC block";
+ String actualMessage = exception.getMessage();
+
+ Assert.assertEquals(expectedMessage, actualMessage);
+ }
+
+ private void testECReconstructionCoordinator(List<Integer> missingIndexes)
+ throws Exception {
+ ObjectStore objectStore = rpcClient.getObjectStore();
+ String keyString = UUID.randomUUID().toString();
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ for (int i = 0; i < EC_DATA; i++) {
+ inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
+ }
+ XceiverClientManager xceiverClientManager =
+ new XceiverClientManager(config);
+ try (OzoneOutputStream out = bucket.createKey(keyString, 4096,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024),
+ new HashMap<>())) {
+ Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+
+ ECReconstructionCoordinator coordinator =
+ new ECReconstructionCoordinator(config, null);
+
+ OzoneKeyDetails key = bucket.getKey(keyString);
+ long conID = key.getOzoneKeyLocations().get(0).getContainerID();
+
+ //Close the container first.
+ scm.getContainerManager().getContainerStateManager().updateContainerState(
+ HddsProtos.ContainerID.newBuilder().setId(conID).build(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ scm.getContainerManager().getContainerStateManager().updateContainerState(
+ HddsProtos.ContainerID.newBuilder().setId(conID).build(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+
+ Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
+ scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
+ .getPipelineID());
+
+ SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+
+ List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
+ List<Pipeline> containerToDeletePipeline = new ArrayList<>();
+ for (DatanodeDetails srcDn : nodeSet) {
+ int replIndex = containerPipeline.getReplicaIndex(srcDn);
+ if (missingIndexes.contains(replIndex)) {
+ containerToDeletePipeline
+ .add(createSingleNodePipeline(containerPipeline, srcDn, replIndex));
+ continue;
+ }
+ sourceNodeMap.put(replIndex, srcDn);
+ }
+
+ //Find nodes outside of pipeline
+ List<DatanodeDetails> clusterDnsList =
+ cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
+ .collect(Collectors.toList());
+ List<DatanodeDetails> targetNodes = new ArrayList<>();
+ for (DatanodeDetails clusterDN : clusterDnsList) {
+ if (!nodeSet.contains(clusterDN)) {
+ targetNodes.add(clusterDN);
+ if (targetNodes.size() == missingIndexes.size()) {
+ break;
+ }
+ }
+ }
+
+ Assert.assertEquals(missingIndexes.size(), targetNodes.size());
+
+ List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
+ blockDataArrList = new ArrayList<>();
+ for (int j = 0; j < containerToDeletePipeline.size(); j++) {
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData =
+ new ECContainerOperationClient(new OzoneConfiguration(), null)
+ .listBlock(conID, containerToDeletePipeline.get(j).getFirstNode(),
+ (ECReplicationConfig) containerToDeletePipeline.get(j)
+ .getReplicationConfig(), null);
+ blockDataArrList.add(blockData);
+ // Delete the first index container
+ ContainerProtocolCalls.deleteContainer(
+ xceiverClientManager.acquireClient(containerToDeletePipeline.get(j)),
+ conID, true, null);
+ }
+
+ //Give the new target to reconstruct the container
+ SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+ for (int k = 0; k < missingIndexes.size(); k++) {
+ targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
+ }
+
+ coordinator.reconstructECContainerGroup(conID,
+ (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+ sourceNodeMap, targetNodeMap);
+
+ // Assert the original container metadata with the new recovered container.
+ Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+ targetNodeMap.entrySet().iterator();
+ int i = 0;
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+ DatanodeDetails targetDN = next.getValue();
+ Map<DatanodeDetails, Integer> indexes = new HashMap<>();
+ indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
+ targetNodeMap.entrySet().iterator().next().getKey());
+ Pipeline newTargetPipeline =
+ Pipeline.newBuilder().setId(PipelineID.randomId())
+ .setReplicationConfig(containerPipeline.getReplicationConfig())
+ .setReplicaIndexes(indexes)
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setNodes(ImmutableList.of(targetDN)).build();
+
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+ reconstructedBlockData =
+ new ECContainerOperationClient(new OzoneConfiguration(), null)
+ .listBlock(conID, newTargetPipeline.getFirstNode(),
+ (ECReplicationConfig) newTargetPipeline
+ .getReplicationConfig(), null);
+ Assert.assertEquals(blockDataArrList.get(i).length,
+ reconstructedBlockData.length);
+ checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+ ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
+ ContainerProtocolCalls.readContainer(
+ xceiverClientManager.acquireClient(newTargetPipeline), conID,
+ null);
+ Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+ readContainerResponseProto.getContainerData().getState());
+ i++;
+ }
+
+ }
+
+ private void checkBlockData(
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+ reconstructedBlockData) {
+
+ for (int i = 0; i < blockData.length; i++) {
+ List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
+ blockData[i].getChunks();
+ List<ContainerProtos.ChunkInfo> newBlockDataChunks =
+ reconstructedBlockData[i].getChunks();
+ for (int j = 0; j < oldBlockDataChunks.size(); j++) {
+ ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
+ if (chunkInfo.getLen() == 0) {
+ // let's ignore the empty chunks
+ continue;
+ }
+ Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j));
+ }
+ }
+
+ /* Assert.assertEquals(
+ Arrays.stream(blockData).map(b -> b.getProtoBufMessage())
+ .collect(Collectors.toList()),
+ Arrays.stream(reconstructedBlockData).map(b -> b.getProtoBufMessage())
+ .collect(Collectors.toList()));*/
+ }
+
public static void startCluster(OzoneConfiguration conf) throws Exception {
// Set minimum pipeline to 1 to ensure all data is written to
@@ -287,15 +506,12 @@ public class TestContainerCommandsEC {
writableECContainerProviderConfig.setMinimumPipelines(1);
conf.setFromObject(writableECContainerProviderConfig);
- cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(NUM_DN)
- .setScmId(SCM_ID)
- .setClusterId(CLUSTER_ID)
- .build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(NUM_DN)
+ .setScmId(SCM_ID).setClusterId(CLUSTER_ID).build();
cluster.waitForClusterToBeReady();
scm = cluster.getStorageContainerManager();
- client = OzoneClientFactory.getRpcClient(conf);
- store = client.getObjectStore();
+ rpcClient = OzoneClientFactory.getRpcClient(conf);
+ store = rpcClient.getObjectStore();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
}
@@ -314,8 +530,8 @@ public class TestContainerCommandsEC {
int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]);
values[i] = RandomUtils.nextBytes(keySize);
final String keyName = UUID.randomUUID().toString();
- try (OutputStream out = bucket.createKey(
- keyName, values[i].length, repConfig, new HashMap<>())) {
+ try (OutputStream out = bucket
+ .createKey(keyName, values[i].length, repConfig, new HashMap<>())) {
out.write(values[i]);
}
}
@@ -330,8 +546,8 @@ public class TestContainerCommandsEC {
}
public static void stopCluster() throws IOException {
- if (client != null) {
- client.close();
+ if (rpcClient != null) {
+ rpcClient.close();
}
if (storageContainerLocationClient != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org