You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/01/10 07:37:06 UTC
[ozone] branch master updated: HDDS-4524. Create freon test to
measure closed container replication (#1635)
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 247dad5 HDDS-4524. Create freon test to measure closed container replication (#1635)
247dad5 is described below
commit 247dad577c644ee373e07f4b4a22a727a7c660b8
Author: Elek, Márton <el...@users.noreply.github.com>
AuthorDate: Sun Jan 10 08:36:48 2021 +0100
HDDS-4524. Create freon test to measure closed container replication (#1635)
---
.../replication/ReplicationSupervisor.java | 5 +-
.../hadoop/ozone/s3/OzoneClientProducer.java | 2 +-
.../hadoop/ozone/freon/BaseFreonGenerator.java | 8 +
.../ozone/freon/ClosedContainerReplicator.java | 213 +++++++++++++++++++++
.../java/org/apache/hadoop/ozone/freon/Freon.java | 3 +-
5 files changed, 227 insertions(+), 4 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 1e9e647..6becf62 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -44,6 +44,7 @@ public class ReplicationSupervisor {
private final ContainerSet containerSet;
private final ContainerReplicator replicator;
private final ExecutorService executor;
+
private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
@@ -116,10 +117,10 @@ public class ReplicationSupervisor {
return containersInFlight.size();
}
- private final class TaskRunner implements Runnable {
+ public final class TaskRunner implements Runnable {
private final ReplicationTask task;
- private TaskRunner(ReplicationTask task) {
+ public TaskRunner(ReplicationTask task) {
this.task = task;
}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
index 04bc950..bee6e65 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
@@ -36,6 +35,7 @@ import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import com.google.common.annotations.VisibleForTesting;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMTokenProto.Type.S3AUTHINFO;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INTERNAL_ERROR;
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
index 8bd410b..65096a6 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
@@ -491,4 +491,12 @@ public class BaseFreonGenerator {
return OzoneClientFactory.getRpcClient(conf);
}
}
+
+ public void setTestNo(long testNo) {
+ this.testNo = testNo;
+ }
+
+ public void setThreadNo(int threadNo) {
+ this.threadNo = threadNo;
+ }
}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
new file mode 100644
index 0000000..ad2810a
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
+import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask;
+import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+
+import com.codahale.metrics.Timer;
+import org.jetbrains.annotations.NotNull;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+/**
+ * Utility to replicated closed container with datanode code.
+ */
+@Command(name = "cr",
+ aliases = "container-replicator",
+ description = "Replicate / download closed containers.",
+ versionProvider = HddsVersionProvider.class,
+ mixinStandardHelpOptions = true,
+ showDefaultValues = true)
+public class ClosedContainerReplicator extends BaseFreonGenerator implements
+ Callable<Void> {
+
+ @Option(names = {"--datanode"},
+ description = "Replicate only containers on this specific datanode.",
+ defaultValue = "")
+ private String datanode;
+
+ private ReplicationSupervisor supervisor;
+
+ private Timer timer;
+
+ private List<ReplicationTask> replicationTasks;
+
+ @Override
+ public Void call() throws Exception {
+
+ OzoneConfiguration conf = createOzoneConfiguration();
+
+ final Collection<String> datanodeStorageDirs =
+ MutableVolumeSet.getDatanodeStorageDirs(conf);
+
+ for (String dir : datanodeStorageDirs) {
+ checkDestinationDirectory(dir);
+ }
+
+ //logic same as the download+import on the destination datanode
+ initializeReplicationSupervisor(conf);
+
+ final ContainerOperationClient containerOperationClient =
+ new ContainerOperationClient(conf);
+
+ final List<ContainerInfo> containerInfos =
+ containerOperationClient.listContainer(0L, 1_000_000);
+
+ replicationTasks = new ArrayList<>();
+
+ for (ContainerInfo container : containerInfos) {
+
+ final ContainerWithPipeline containerWithPipeline =
+ containerOperationClient
+ .getContainerWithPipeline(container.getContainerID());
+
+ if (container.getState() == LifeCycleState.CLOSED) {
+
+ final List<DatanodeDetails> datanodesWithContainer =
+ containerWithPipeline.getPipeline().getNodes();
+
+ final List<String> datanodeUUIDs =
+ datanodesWithContainer
+ .stream().map(DatanodeDetails::getUuidString)
+ .collect(Collectors.toList());
+
+ //if datanode is specified, replicate only container if it has a
+ //replica.
+ if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) {
+ replicationTasks.add(new ReplicationTask(container.getContainerID(),
+ datanodesWithContainer));
+ }
+ }
+
+ }
+
+ //important: override the max number of tasks.
+ setTestNo(replicationTasks.size());
+
+ init();
+
+ timer = getMetrics().timer("replicate-container");
+ runTests(this::replicateContainer);
+ return null;
+ }
+
+ /**
+ * Check id target directory is not re-used.
+ */
+ private void checkDestinationDirectory(String dirUrl) throws IOException {
+ final StorageLocation storageLocation = StorageLocation.parse(dirUrl);
+ final Path dirPath = Paths.get(storageLocation.getUri().getPath());
+
+ if (Files.notExists(dirPath)) {
+ return;
+ }
+
+ if (Files.list(dirPath).count() == 0) {
+ return;
+ }
+
+ throw new IllegalArgumentException(
+ "Configured storage directory " + dirUrl
+ + " (used as destination) should be empty");
+ }
+
+ @NotNull
+ private void initializeReplicationSupervisor(ConfigurationSource conf)
+ throws IOException {
+ String fakeDatanodeUuid = datanode;
+
+ if (fakeDatanodeUuid.isEmpty()) {
+ fakeDatanodeUuid = UUID.randomUUID().toString();
+ }
+
+ ContainerSet containerSet = new ContainerSet();
+
+ ContainerMetrics metrics = ContainerMetrics.create(conf);
+
+ MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf);
+
+ Map<ContainerType, Handler> handlers = new HashMap<>();
+
+ for (ContainerType containerType : ContainerType.values()) {
+ final Handler handler =
+ Handler.getHandlerForContainerType(
+ containerType,
+ conf,
+ fakeDatanodeUuid,
+ containerSet,
+ volumeSet,
+ metrics,
+ containerReplicaProto -> {
+ });
+ handler.setScmID(UUID.randomUUID().toString());
+ handlers.put(containerType, handler);
+ }
+
+ ContainerController controller =
+ new ContainerController(containerSet, handlers);
+
+ ContainerReplicator replicator =
+ new DownloadAndImportReplicator(containerSet,
+ controller,
+ new SimpleContainerDownloader(conf, null),
+ new TarContainerPacker());
+
+ supervisor = new ReplicationSupervisor(containerSet, replicator, 10);
+ }
+
+ private void replicateContainer(long counter) throws Exception {
+ timer.time(() -> {
+ final ReplicationTask replicationTask =
+ replicationTasks.get((int) counter);
+ supervisor.new TaskRunner(replicationTask).run();
+ return null;
+ });
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index 1b03540..d3c5ae6 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -52,7 +52,8 @@ import picocli.CommandLine.Option;
DatanodeBlockPutter.class,
FollowerAppendLogEntryGenerator.class,
ChunkManagerDiskWrite.class,
- LeaderAppendLogEntryGenerator.class},
+ LeaderAppendLogEntryGenerator.class,
+ ClosedContainerReplicator.class},
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true)
public class Freon extends GenericCli {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org