You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/01/10 07:37:06 UTC

[ozone] branch master updated: HDDS-4524. Create freon test to measure closed container replication (#1635)

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 247dad5  HDDS-4524. Create freon test to measure closed container replication (#1635)
247dad5 is described below

commit 247dad577c644ee373e07f4b4a22a727a7c660b8
Author: Elek, Márton <el...@users.noreply.github.com>
AuthorDate: Sun Jan 10 08:36:48 2021 +0100

    HDDS-4524. Create freon test to measure closed container replication (#1635)
---
 .../replication/ReplicationSupervisor.java         |   5 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |   2 +-
 .../hadoop/ozone/freon/BaseFreonGenerator.java     |   8 +
 .../ozone/freon/ClosedContainerReplicator.java     | 213 +++++++++++++++++++++
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |   3 +-
 5 files changed, 227 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 1e9e647..6becf62 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -44,6 +44,7 @@ public class ReplicationSupervisor {
   private final ContainerSet containerSet;
   private final ContainerReplicator replicator;
   private final ExecutorService executor;
+
   private final AtomicLong requestCounter = new AtomicLong();
   private final AtomicLong successCounter = new AtomicLong();
   private final AtomicLong failureCounter = new AtomicLong();
@@ -116,10 +117,10 @@ public class ReplicationSupervisor {
     return containersInFlight.size();
   }
 
-  private final class TaskRunner implements Runnable {
+  public final class TaskRunner implements Runnable {
     private final ReplicationTask task;
 
-    private TaskRunner(ReplicationTask task) {
+    public TaskRunner(ReplicationTask task) {
       this.task = task;
     }
 
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
index 04bc950..bee6e65 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
@@ -36,6 +35,7 @@ import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.common.annotations.VisibleForTesting;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMTokenProto.Type.S3AUTHINFO;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INTERNAL_ERROR;
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
index 8bd410b..65096a6 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java
@@ -491,4 +491,12 @@ public class BaseFreonGenerator {
       return OzoneClientFactory.getRpcClient(conf);
     }
   }
+
+  public void setTestNo(long testNo) {
+    this.testNo = testNo;
+  }
+
+  public void setThreadNo(int threadNo) {
+    this.threadNo = threadNo;
+  }
 }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
new file mode 100644
index 0000000..ad2810a
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
+import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask;
+import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+
+import com.codahale.metrics.Timer;
+import org.jetbrains.annotations.NotNull;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+/**
+ * Utility to replicated closed container with datanode code.
+ */
+@Command(name = "cr",
+    aliases = "container-replicator",
+    description = "Replicate / download closed containers.",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true,
+    showDefaultValues = true)
+public class ClosedContainerReplicator extends BaseFreonGenerator implements
+    Callable<Void> {
+
+  @Option(names = {"--datanode"},
+      description = "Replicate only containers on this specific datanode.",
+      defaultValue = "")
+  private String datanode;
+
+  private ReplicationSupervisor supervisor;
+
+  private Timer timer;
+
+  private List<ReplicationTask> replicationTasks;
+
+  @Override
+  public Void call() throws Exception {
+
+    OzoneConfiguration conf = createOzoneConfiguration();
+
+    final Collection<String> datanodeStorageDirs =
+        MutableVolumeSet.getDatanodeStorageDirs(conf);
+
+    for (String dir : datanodeStorageDirs) {
+      checkDestinationDirectory(dir);
+    }
+
+    //logic same as the download+import on the destination datanode
+    initializeReplicationSupervisor(conf);
+
+    final ContainerOperationClient containerOperationClient =
+        new ContainerOperationClient(conf);
+
+    final List<ContainerInfo> containerInfos =
+        containerOperationClient.listContainer(0L, 1_000_000);
+
+    replicationTasks = new ArrayList<>();
+
+    for (ContainerInfo container : containerInfos) {
+
+      final ContainerWithPipeline containerWithPipeline =
+          containerOperationClient
+              .getContainerWithPipeline(container.getContainerID());
+
+      if (container.getState() == LifeCycleState.CLOSED) {
+
+        final List<DatanodeDetails> datanodesWithContainer =
+            containerWithPipeline.getPipeline().getNodes();
+
+        final List<String> datanodeUUIDs =
+            datanodesWithContainer
+                .stream().map(DatanodeDetails::getUuidString)
+                .collect(Collectors.toList());
+
+        //if datanode is specified, replicate only container if it has a
+        //replica.
+        if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) {
+          replicationTasks.add(new ReplicationTask(container.getContainerID(),
+              datanodesWithContainer));
+        }
+      }
+
+    }
+
+    //important: override the max number of tasks.
+    setTestNo(replicationTasks.size());
+
+    init();
+
+    timer = getMetrics().timer("replicate-container");
+    runTests(this::replicateContainer);
+    return null;
+  }
+
+  /**
+   * Check id target directory is not re-used.
+   */
+  private void checkDestinationDirectory(String dirUrl) throws IOException {
+    final StorageLocation storageLocation = StorageLocation.parse(dirUrl);
+    final Path dirPath = Paths.get(storageLocation.getUri().getPath());
+
+    if (Files.notExists(dirPath)) {
+      return;
+    }
+
+    if (Files.list(dirPath).count() == 0) {
+      return;
+    }
+
+    throw new IllegalArgumentException(
+        "Configured storage directory " + dirUrl
+            + " (used as destination) should be empty");
+  }
+
+  @NotNull
+  private void initializeReplicationSupervisor(ConfigurationSource conf)
+      throws IOException {
+    String fakeDatanodeUuid = datanode;
+
+    if (fakeDatanodeUuid.isEmpty()) {
+      fakeDatanodeUuid = UUID.randomUUID().toString();
+    }
+
+    ContainerSet containerSet = new ContainerSet();
+
+    ContainerMetrics metrics = ContainerMetrics.create(conf);
+
+    MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf);
+
+    Map<ContainerType, Handler> handlers = new HashMap<>();
+
+    for (ContainerType containerType : ContainerType.values()) {
+      final Handler handler =
+          Handler.getHandlerForContainerType(
+              containerType,
+              conf,
+              fakeDatanodeUuid,
+              containerSet,
+              volumeSet,
+              metrics,
+              containerReplicaProto -> {
+              });
+      handler.setScmID(UUID.randomUUID().toString());
+      handlers.put(containerType, handler);
+    }
+
+    ContainerController controller =
+        new ContainerController(containerSet, handlers);
+
+    ContainerReplicator replicator =
+        new DownloadAndImportReplicator(containerSet,
+            controller,
+            new SimpleContainerDownloader(conf, null),
+            new TarContainerPacker());
+
+    supervisor = new ReplicationSupervisor(containerSet, replicator, 10);
+  }
+
+  private void replicateContainer(long counter) throws Exception {
+    timer.time(() -> {
+      final ReplicationTask replicationTask =
+          replicationTasks.get((int) counter);
+      supervisor.new TaskRunner(replicationTask).run();
+      return null;
+    });
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index 1b03540..d3c5ae6 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -52,7 +52,8 @@ import picocli.CommandLine.Option;
         DatanodeBlockPutter.class,
         FollowerAppendLogEntryGenerator.class,
         ChunkManagerDiskWrite.class,
-        LeaderAppendLogEntryGenerator.class},
+        LeaderAppendLogEntryGenerator.class,
+        ClosedContainerReplicator.class},
     versionProvider = HddsVersionProvider.class,
     mixinStandardHelpOptions = true)
 public class Freon extends GenericCli {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org