You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/11/30 09:53:10 UTC

[ozone] branch master updated: HDDS-5702. SCM remote benchmark tool. (#2600)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b0604d  HDDS-5702. SCM remote benchmark tool. (#2600)
5b0604d is described below

commit 5b0604d7f52be986f5b9405e503050959f448f24
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Tue Nov 30 17:52:52 2021 +0800

    HDDS-5702. SCM remote benchmark tool. (#2600)
    
    * HDDS-5702. SCM remote benchmark tool.
    
    * fix findbugs problems
    
    * fix checkstyle
    
    * More descriptions on the configs.
    
    * retrigger ci
---
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |   3 +-
 .../hadoop/ozone/freon/SCMThroughputBenchmark.java | 908 +++++++++++++++++++++
 2 files changed, 910 insertions(+), 1 deletion(-)

diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index 74a74d7..c5b9a3a 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -62,7 +62,8 @@ import picocli.CommandLine.Option;
         GeneratorScm.class,
         GeneratorDatanode.class,
         ClosedContainerReplicator.class,
-        StreamingGenerator.class},
+        StreamingGenerator.class,
+        SCMThroughputBenchmark.class},
     versionProvider = HddsVersionProvider.class,
     mixinStandardHelpOptions = true)
 public class Freon extends GenericCli {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java
new file mode 100644
index 0000000..9c1472e
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/SCMThroughputBenchmark.java
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmRpcRetryCount;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmRpcRetryInterval;
+
+/**
+ * Benchmark for scm throughput.
+ * - allocate blocks (ops)
+ * - allocate containers (ops)
+ * - process reports(container reports only) (ops)
+ *
+ * Remember to add the following configs to your ozone-site.xml:
+ * - ozone.scm.heartbeat.thread.interval: 1h
+ * - hdds.heartbeat.interval: 1h
+ * - ozone.scm.stale.node.interval: 1d
+ * - ozone.scm.dead.node.interval: 2d
+ * These make the faked datanodes long live.
+ */
+@CommandLine.Command(name = "scm-throughput-benchmark",
+    aliases = "stb",
+    description = "Benchmark for scm throughput.",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true,
+    showDefaultValues = true)
+public final class SCMThroughputBenchmark implements Callable<Void> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMThroughputBenchmark.class);
+
+  /**
+   * Type of benchmarks.
+   */
+  public enum BenchmarkType {
+    AllocateBlocks,
+    AllocateContainers,
+    ProcessReports,
+  }
+
+  @CommandLine.ParentCommand
+  private Freon freon;
+
+  @CommandLine.Option(names = {"--benchmark"},
+      description = "Which type of benchmark to run " +
+          "(AllocateBlocks, AllocateContainers, ProcessReports).",
+      required = true,
+      defaultValue = "")
+  private String benchmarkType = "";
+
+  @CommandLine.Option(names = {"--num-blocks"},
+      description = "Number of blocks.",
+      defaultValue = "1000")
+  private int numBlocks = 1000;
+
+  @CommandLine.Option(names = {"--block-size"},
+      description = "Block size.",
+      defaultValue = "4096")
+  private long blockSize = 4096;
+
+  @CommandLine.Option(names = {"--num-containers"},
+      description = "Number of containers.",
+      defaultValue = "100")
+  private int numContainers = 100;
+
+  @CommandLine.Option(names = {"--num-datanodes"},
+      description = "Number of fake datanodes.",
+      defaultValue = "10")
+  private int numDatanodes = 10;
+
+  @CommandLine.Option(names = {"--num-threads"},
+      description = "Number of scm client threads.",
+      defaultValue = "4")
+  private int numThreads = 4;
+
+  @CommandLine.Option(names = {"--num-heartbeats"},
+      description = "Number of heartbeats that carries reports.",
+      defaultValue = "4")
+  private int numHeartbeats = 4;
+
+  @CommandLine.Option(names = {"--scmHost"},
+      required = true,
+      description = "The leader scm host x.x.x.x.")
+  private String scm;
+
+  static final int CHECK_INTERVAL_MILLIS = 5000;
+
+  private static final Random RANDOM = new Random();
+
+  private OzoneConfiguration conf;
+
+  private List<FakeDatanode> datanodes;
+
+  private StorageContainerDatanodeProtocol datanodeScmClient;
+
+  private StorageContainerLocationProtocol scmContainerClient;
+
+  private ScmBlockLocationProtocol scmBlockClient;
+
+  private SCMThroughputBenchmark() {
+  }
+
+  @Override
+  public Void call() throws Exception {
+    ThroughputBenchmark benchmark = createBenchmark();
+
+    initCluster();
+
+    benchmark.run();
+
+    return null;
+  }
+
+  private ThroughputBenchmark createBenchmark() {
+    ThroughputBenchmark benchmark = null;
+    BenchmarkType type = BenchmarkType.valueOf(benchmarkType);
+    switch (type) {
+    case AllocateBlocks:
+      benchmark = new BlockBenchmark(numThreads, numBlocks, blockSize);
+      break;
+    case AllocateContainers:
+      benchmark = new ContainerBenchmark(numThreads, numContainers);
+      break;
+    case ProcessReports:
+      benchmark = new ReportBenchmark(numDatanodes, numContainers,
+          numHeartbeats);
+      break;
+    default:
+      throw new IllegalArgumentException(benchmarkType +
+          " is not a valid benchmarkType.");
+    }
+
+    LOG.info("Run benchmark: {}", type);
+
+    return benchmark;
+  }
+
+  private void initCluster() throws IOException, InterruptedException,
+      IllegalArgumentException {
+
+    this.conf = freon.createOzoneConfiguration();
+
+    initSCMClients();
+
+    registerFakeDatanodes();
+
+    activatePipelines();
+
+    exitSafeMode();
+  }
+
+  private void initSCMClients() throws IOException {
+    datanodeScmClient = createDatanodeScmClient();
+
+    scmContainerClient = createScmContainerClient();
+
+    scmBlockClient = createScmBlockClient();
+
+    LOG.info("Initialized scm clients " +
+        "{datanodeClient, containerClient, blockClient}");
+  }
+
+  private void registerFakeDatanodes() throws IOException {
+    datanodes = new ArrayList<>();
+
+    for (int i = 0; i < numDatanodes; i++) {
+      FakeDatanode dn = new FakeDatanode();
+      dn.register();
+      datanodes.add(dn);
+    }
+
+    LOG.info("Registered datanode(fake): {}", numDatanodes);
+  }
+
+  /**
+   * Activate pipelines manually, since we only have fake datanodes
+   * that don't react to scm commands.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void activatePipelines() throws IOException, InterruptedException {
+    LOG.info("Waiting for pipelines to be allocated automatically");
+
+    Thread.sleep(Duration.ofSeconds(60).toMillis());
+
+    List<Pipeline> pipelines = scmContainerClient.listPipelines();
+
+    for (Pipeline pipeline : pipelines) {
+      scmContainerClient.activatePipeline(pipeline.getId().getProtobuf());
+    }
+
+    LOG.info("Force opened pipelines: {}", pipelines.size());
+  }
+
+  /**
+   * Exit SafeMode manually, so we could do block allocations.
+   * @throws IOException
+   */
+  private void exitSafeMode() throws IOException {
+    try {
+      if (!scmContainerClient.forceExitSafeMode()) {
+        throw new IOException("Safe mode exit failed");
+      }
+    } catch (IOException e) {
+      LOG.warn("Safe mode exit with exception, but we can go on");
+    }
+
+    try {
+      if (scmContainerClient.inSafeMode()) {
+        throw new IOException("Safe mode exit failed indeed");
+      }
+    } catch (IOException e) {
+      LOG.error("{}", e);
+      throw e;
+    }
+
+    LOG.info("Force exited safe mode");
+  }
+
+  private StorageContainerDatanodeProtocol createDatanodeScmClient()
+      throws IOException {
+    int dnPort = conf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
+        OZONE_SCM_DATANODE_PORT_DEFAULT);
+    InetSocketAddress scmAddress = NetUtils.createSocketAddr(scm, dnPort);
+
+    Configuration hadoopConfig =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+    RPC.setProtocolEngine(
+        hadoopConfig,
+        StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long version =
+        RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+
+    SCMClientConfig scmClientConfig = conf.getObject(SCMClientConfig.class);
+    int rpcTimeout = (int) scmClientConfig.getRpcTimeOut();
+
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+            getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+            TimeUnit.MILLISECONDS);
+
+    StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+        StorageContainerDatanodeProtocolPB.class, version,
+        scmAddress, UserGroupInformation.getCurrentUser(), hadoopConfig,
+        NetUtils.getDefaultSocketFactory(hadoopConfig), rpcTimeout,
+        retryPolicy).getProxy();
+
+    return new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
+  }
+
+  private StorageContainerLocationProtocol createScmContainerClient() {
+    return HAUtils.getScmContainerClient(conf);
+  }
+
+  private ScmBlockLocationProtocol createScmBlockClient() {
+    return HAUtils.getScmBlockClient(conf);
+  }
+
+  /**
+   * Base class for all benchmark types.
+   */
+  private abstract class ThroughputBenchmark {
+
+    private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
+
+    private long startTime;
+    private long execTime;
+    private String formattedTime;
+    private int numThreads;
+    private Queue<Runnable> taskQueue;
+    private ExecutorService executor;
+
+    ThroughputBenchmark(int threads) {
+      this.numThreads = threads;
+      this.taskQueue = new LinkedList<>();
+      this.executor = Executors.newFixedThreadPool(this.numThreads);
+    }
+
+    public void run() throws InterruptedException {
+      prepare();
+
+      execTasks();
+
+      showSummary();
+    }
+
+    public void prepare() {
+      LOG.info("Preparing tasks to run");
+    }
+
+    public void execTasks() throws InterruptedException {
+      setStartTime(System.nanoTime());
+
+      LOG.info("Benchmark tasks started");
+
+      for (int i = 0; i < this.numThreads; i++) {
+        this.executor.execute(taskQueue.poll());
+      }
+
+      waitForComplete();
+
+      this.executor.shutdown();
+      this.executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+
+    public void showSummary() {
+      execTime = System.nanoTime() - this.startTime;
+      formattedTime = DurationFormatUtils.formatDuration(
+          TimeUnit.NANOSECONDS.toMillis(execTime),
+          DURATION_FORMAT);
+    }
+
+    public abstract void waitForComplete() throws InterruptedException;
+
+    protected long getStartTime() {
+      return this.startTime;
+    }
+
+    protected void setStartTime(long time) {
+      this.startTime = time;
+    }
+
+    protected long getExecTime() {
+      return this.execTime;
+    }
+
+    protected String getFormattedTime() {
+      return this.formattedTime;
+    }
+
+    protected int getNumThreads() {
+      return this.numThreads;
+    }
+
+    protected void enqueueTask(Runnable task) {
+      this.taskQueue.add(task);
+    }
+  }
+
+  /**
+   * Benchmarks throughput of allocate block operation from scm clients.
+   */
+  private class BlockBenchmark extends ThroughputBenchmark {
+
+    private AtomicLong totalBlockCounter;
+    private AtomicLong succBlockCounter;
+    private AtomicLong failBlockCounter;
+    private int totalBlocks;
+    private long blockSize;
+
+    BlockBenchmark(int threads, int blocks, long blockSize) {
+      super(threads);
+      this.totalBlocks = blocks;
+      this.blockSize = blockSize;
+      this.totalBlockCounter = new AtomicLong();
+      this.succBlockCounter = new AtomicLong();
+      this.failBlockCounter = new AtomicLong();
+    }
+
+    @Override
+    public void prepare() {
+      super.prepare();
+      for (int i = 0; i < getNumThreads(); i++) {
+        enqueueTask(new BlockTask(this.blockSize));
+      }
+    }
+
+    @Override
+    public void showSummary() {
+      super.showSummary();
+
+      long execSecs = TimeUnit.SECONDS.convert(getExecTime(),
+          TimeUnit.NANOSECONDS);
+      long blocks = succBlockCounter.get();
+      float blocksPerSec = execSecs != 0 ? (float) blocks / execSecs : blocks;
+
+      System.out.println("***************************************");
+      System.out.printf("Total allocated blocks: %d%n",
+          succBlockCounter.get());
+      System.out.printf("Total failed blocks: %d%n",
+          failBlockCounter.get());
+      System.out.printf("Execution Time: %s%n", getFormattedTime());
+      System.out.printf("Throughput: %f (ops)%n", blocksPerSec);
+      System.out.println("***************************************");
+    }
+
+    @Override
+    public void waitForComplete() throws InterruptedException {
+      while (totalBlockCounter.get() < this.totalBlocks) {
+        Thread.sleep(CHECK_INTERVAL_MILLIS);
+        LOG.info("Blocks allocated: {}/{}",
+            totalBlockCounter.get(), this.totalBlocks);
+      }
+    }
+
+    private void doAllocateBlock(long size, int nBlocks,
+        ReplicationConfig config) {
+      try {
+        scmBlockClient.allocateBlock(size, nBlocks, config, "STB",
+            new ExcludeList());
+        succBlockCounter.incrementAndGet();
+      } catch (IOException e) {
+        LOG.error("{}", e);
+        failBlockCounter.incrementAndGet();
+      }
+    }
+
+    private class BlockTask implements Runnable {
+
+      private long blockSize;
+
+      BlockTask(long blockSize) {
+        this.blockSize = blockSize;
+      }
+
+      @Override
+      public void run() {
+        while (totalBlockCounter.getAndIncrement() < totalBlocks) {
+          doAllocateBlock(blockSize, 1,
+              new RatisReplicationConfig(ReplicationFactor.THREE));
+        }
+      }
+    }
+  }
+
+  /**
+   * Benchmarks throughput of allocate container operation which
+   * comes along with allocate block.
+   * We do this to see how allocate container scales.
+   */
+  private class ContainerBenchmark extends ThroughputBenchmark {
+
+    private AtomicInteger totalContainerCounter;
+    private AtomicInteger succContainerCounter;
+    private AtomicInteger failContainerCounter;
+    private int totalContainers;
+
+    ContainerBenchmark(int threads, int containers) {
+      super(threads);
+      this.totalContainers = containers;
+      this.totalContainerCounter = new AtomicInteger();
+      this.succContainerCounter = new AtomicInteger();
+      this.failContainerCounter = new AtomicInteger();
+    }
+
+    @Override
+    public void prepare() {
+      super.prepare();
+      for (int i = 0; i < getNumThreads(); i++) {
+        enqueueTask(new ContainerTask());
+      }
+    }
+
+    @Override
+    public void showSummary() {
+      super.showSummary();
+
+      long execSecs = TimeUnit.SECONDS.convert(getExecTime(),
+          TimeUnit.NANOSECONDS);
+      long containers = succContainerCounter.get();
+      float containersPerSec = execSecs != 0 ?
+          (float)containers / execSecs : containers;
+
+      System.out.println("***************************************");
+      System.out.printf("Total allocated containers: %d%n",
+          succContainerCounter.get());
+      System.out.printf("Total failed containers: %d%n",
+          failContainerCounter.get());
+      System.out.printf("Execution Time: %s%n", getFormattedTime());
+      System.out.printf("Throughput: %f (ops)%n", containersPerSec);
+      System.out.println("***************************************");
+    }
+
+    @Override
+    public void waitForComplete() throws InterruptedException {
+      while (totalContainerCounter.get() < this.totalContainers) {
+        Thread.sleep(CHECK_INTERVAL_MILLIS);
+        LOG.info("Containers allocated: {}/{}",
+            totalContainerCounter.get(), this.totalContainers);
+      }
+    }
+
+    private void doAllocateContainer(ReplicationFactor factor) {
+      try {
+        scmContainerClient.allocateContainer(
+            ReplicationType.RATIS, factor, "STB");
+        succContainerCounter.incrementAndGet();
+      } catch (IOException e) {
+        LOG.error("{}", e);
+        failContainerCounter.incrementAndGet();
+      }
+    }
+
+    private class ContainerTask implements Runnable {
+
+      ContainerTask() {
+      }
+
+      @Override
+      public void run() {
+        while (totalContainerCounter.getAndIncrement() < totalContainers) {
+          doAllocateContainer(ReplicationFactor.THREE);
+        }
+      }
+    }
+  }
+
+  /**
+   * Benchmarks throughput of process report in scm.
+   * 1. Create containers.
+   * 2. Send container reports based on the info of the containers created.
+   */
+  private class ReportBenchmark extends ThroughputBenchmark {
+
+    private static final String REPORT_PREFIX =
+        "scm_container_manager_metrics_num_container_reports_processed_";
+    private static final String REPORT_SUCC_KEY = REPORT_PREFIX + "successful";
+    private static final String REPORT_FAIL_KEY = REPORT_PREFIX + "failed";
+
+    private AtomicInteger succReportSendCounter;
+    private AtomicInteger failReportSendCounter;
+    private int reportSuccProcessedOnRegister;
+    private int reportFailProcessedOnRegister;
+    private int succReportsProcessed;
+    private int failReportsProcessed;
+    private int numReports;
+    private int numReportRounds;
+    private int totalContainers;
+    private int containersPerReport;
+    private List<ContainerInfo> containers;
+
+    ReportBenchmark(int threads, int containers, int rounds) {
+      super(threads);
+      this.succReportSendCounter = new AtomicInteger();
+      this.failReportSendCounter = new AtomicInteger();
+      this.numReportRounds = rounds;
+      this.numReports = numDatanodes * rounds;
+      this.totalContainers = containers;
+      this.containers = new ArrayList<>();
+      this.reportSuccProcessedOnRegister = 0;
+      this.reportFailProcessedOnRegister = 0;
+    }
+
+    /**
+     * Prepare containers on scm side for reports.
+     */
+    @Override
+    public void prepare() {
+      // allocate containers
+      // here we intend to use RATIS/ONE, then we don't have to
+      // distribute containers to datanodes in regards of replication.
+
+      LOG.info("Preparing containers for reports");
+
+      for (int i = 0; i < totalContainers; i++) {
+        try {
+          ContainerWithPipeline container = scmContainerClient
+              .allocateContainer(ReplicationType.RATIS, ReplicationFactor.ONE,
+                  "STB");
+          containers.add(container.getContainerInfo());
+        } catch (IOException e) {
+          LOG.error("{}", e);
+        }
+      }
+
+      LOG.info("Allocated containers: {}", containers.size());
+
+      // build container reports based on container info returned
+      totalContainers = containers.size();
+      containersPerReport = totalContainers / numDatanodes;
+      int from = 0;
+      for (int i = 0; i < getNumThreads(); i++) {
+        datanodes.get(i).buildContainerReports(containers.subList(
+            from, Math.min(from + containersPerReport, totalContainers)));
+        from += containersPerReport;
+      }
+
+      getScmReportProcessed();
+      reportSuccProcessedOnRegister = this.succReportsProcessed;
+      reportFailProcessedOnRegister = this.failReportsProcessed;
+      this.succReportsProcessed = 0;
+      this.failReportsProcessed = 0;
+
+      super.prepare();
+      for (int i = 0; i < getNumThreads(); i++) {
+        enqueueTask(new ReportTask(datanodes.get(i), numReportRounds));
+      }
+    }
+
+    @Override
+    public void showSummary() {
+      super.showSummary();
+
+      long execSecs = TimeUnit.SECONDS.convert(getExecTime(),
+          TimeUnit.NANOSECONDS);
+      float reportsPerSec = execSecs > 0 ?
+          (float) succReportsProcessed / execSecs : succReportsProcessed;
+
+      System.out.println("***************************************");
+      System.out.printf("Total container reports processed: %d%n",
+          succReportsProcessed);
+      System.out.printf("Total container reports failed: %d%n",
+          failReportsProcessed);
+      System.out.printf("Containers per report: %d%n", containersPerReport);
+      System.out.printf("Execution Time: %s%n", getFormattedTime());
+      System.out.printf("Throughput: %f (ops)%n", reportsPerSec);
+      System.out.println("***************************************");
+    }
+
+    @Override
+    public void waitForComplete() throws InterruptedException {
+      while (succReportsProcessed + failReportsProcessed < numReports) {
+        Thread.sleep(CHECK_INTERVAL_MILLIS);
+        LOG.info("Processing reports: ({}+{})/{}",
+            succReportsProcessed, failReportsProcessed, numReports);
+        getScmReportProcessed();
+      }
+    }
+
+    private void getScmReportProcessed() {
+      List<String> metricsLines = getScmReportProcessedMetricsLines();
+      for (String line : metricsLines) {
+        if (line.startsWith("#")) {
+          continue;
+        }
+        if (line.startsWith(REPORT_SUCC_KEY)) {
+          this.succReportsProcessed = Integer.parseInt(getMetricsValue(line))
+              - this.reportSuccProcessedOnRegister;
+        }
+        if (line.startsWith(REPORT_FAIL_KEY)) {
+          this.failReportsProcessed = Integer.parseInt(getMetricsValue(line))
+              - this.reportFailProcessedOnRegister;
+        }
+      }
+    }
+
+    private List<String> getScmReportProcessedMetricsLines() {
+      HttpClient client = HttpClientBuilder.create().build();
+      String uri = String.format("http://%s:%d/prom", scm,
+          ScmConfigKeys.OZONE_SCM_HTTP_BIND_PORT_DEFAULT);
+      HttpGet get = new HttpGet(uri);
+      try {
+        HttpResponse execute = client.execute(get);
+        if (execute.getStatusLine().getStatusCode() != 200) {
+          throw new RuntimeException(
+              "Can't read prometheus metrics endpoint" + execute.getStatusLine()
+                  .getStatusCode());
+        }
+        try (BufferedReader bufferedReader = new BufferedReader(
+            new InputStreamReader(execute.getEntity().getContent(),
+                StandardCharsets.UTF_8))) {
+          return bufferedReader.lines().collect(Collectors.toList());
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    String getMetricsValue(String line) {
+      String[] parts = line.split(" ");
+      return parts[1];
+    }
+
+    private class ReportTask implements Runnable {
+
+      private FakeDatanode datanode;
+      private int rounds;
+
+      ReportTask(FakeDatanode datanode, int rounds) {
+        this.datanode = datanode;
+        this.rounds = rounds;
+      }
+
+      @Override
+      public void run() {
+        for (int i = 0; i < rounds; i++) {
+          try {
+            datanode.sendHeartbeat();
+            succReportSendCounter.incrementAndGet();
+          } catch (IOException e) {
+            LOG.error("{}", e);
+            failReportSendCounter.incrementAndGet();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This class simulates the register and heartbeat behavior
+   * of a normal Datanode, but does not have real daemons.
+   */
+  private class FakeDatanode {
+    private DatanodeDetails datanodeDetails;
+    private ContainerReportsProto containerReport;
+
+    FakeDatanode() {
+      datanodeDetails = createRandomDatanodeDetails();
+      containerReport = null;
+    }
+
+    public void register() throws IOException {
+      SCMRegisteredResponseProto response = datanodeScmClient.register(
+          datanodeDetails.getExtendedProtoBufMessage(),
+          createNodeReport(datanodeDetails.getUuid()),
+          createContainerReport(),
+          createPipelineReport(),
+          UpgradeUtils.defaultLayoutVersionProto());
+
+      if (response.hasHostname() && response.hasIpAddress()) {
+        datanodeDetails.setHostName(response.getHostname());
+        datanodeDetails.setIpAddress(response.getIpAddress());
+      }
+      if (response.hasNetworkName() && response.hasNetworkLocation()) {
+        datanodeDetails.setNetworkName(response.getNetworkName());
+        datanodeDetails.setNetworkLocation(response.getNetworkLocation());
+      }
+    }
+
+    public void sendHeartbeat() throws IOException {
+      SCMHeartbeatRequestProto heartbeatRequest = SCMHeartbeatRequestProto
+          .newBuilder()
+          .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+          .setContainerReport(containerReport)
+          .setDataNodeLayoutVersion(UpgradeUtils.defaultLayoutVersionProto())
+          .build();
+      datanodeScmClient.sendHeartbeat(heartbeatRequest);
+      // scm commands are ignored
+    }
+
+    public void buildContainerReports(List<ContainerInfo> containers) {
+      ContainerReportsProto.Builder reportBuilder =
+          ContainerReportsProto.newBuilder();
+      for (ContainerInfo cinfo : containers) {
+        ContainerReplicaProto.Builder replicaBuilder =
+            ContainerReplicaProto.newBuilder();
+        replicaBuilder.setContainerID(cinfo.getContainerID())
+            .setReadCount(0)
+            .setWriteCount(1)
+            .setReadBytes(0)
+            .setWriteBytes(4096)
+            .setKeyCount(1)
+            .setUsed(4096)
+            .setState(ContainerReplicaProto.State.OPEN)
+            .setDeleteTransactionId(cinfo.getDeleteTransactionId())
+            .setBlockCommitSequenceId(cinfo.getSequenceId())
+            .setOriginNodeId(datanodeDetails.getUuidString());
+
+        reportBuilder.addReports(replicaBuilder.build());
+      }
+
+      containerReport = reportBuilder.build();
+    }
+
+    public DatanodeDetails getDatanodeDetails() {
+      return this.datanodeDetails;
+    }
+  }
+
+  private static DatanodeDetails createRandomDatanodeDetails() {
+    UUID uuid = UUID.randomUUID();
+    String ipAddress =
+        RANDOM.nextInt(256) + "." + RANDOM.nextInt(256) + "." + RANDOM
+            .nextInt(256) + "." + RANDOM.nextInt(256);
+
+    DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.STANDALONE, 0);
+    DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.RATIS, 0);
+    DatanodeDetails.Port restPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.REST, 0);
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    builder.setUuid(uuid).setHostName("localhost")
+        .setIpAddress(ipAddress)
+        .addPort(containerPort)
+        .addPort(ratisPort)
+        .addPort(restPort);
+    return builder.build();
+  }
+
+  private static NodeReportProto createNodeReport(UUID nodeId) {
+    List<StorageReportProto> storageReports = new ArrayList<>();
+    List<MetadataStorageReportProto> metadataStorageReports =
+        new ArrayList<>();
+    storageReports.add(createStorageReport(nodeId));
+    metadataStorageReports.add(createMetadataStorageReport());
+    NodeReportProto.Builder nb = NodeReportProto.newBuilder();
+    nb.addAllStorageReport(storageReports)
+        .addAllMetadataStorageReport(metadataStorageReports);
+    return nb.build();
+  }
+
+  private static StorageReportProto createStorageReport(UUID nodeId) {
+    StorageReportProto.Builder srb = StorageReportProto.newBuilder();
+    srb.setStorageUuid(nodeId.toString())
+        .setStorageLocation("/data")
+        .setCapacity(100 * OzoneConsts.TB)
+        .setScmUsed(0)
+        .setFailed(false)
+        .setRemaining(100 * OzoneConsts.TB)
+        .setStorageType(StorageTypeProto.DISK);
+    return srb.build();
+  }
+
+  private static MetadataStorageReportProto createMetadataStorageReport() {
+    MetadataStorageReportProto.Builder mrb =
+        MetadataStorageReportProto.newBuilder();
+    mrb.setStorageLocation("/meta")
+        .setCapacity(100 * OzoneConsts.GB)
+        .setScmUsed(0)
+        .setFailed(false)
+        .setRemaining(100 * OzoneConsts.GB)
+        .setStorageType(StorageTypeProto.DISK);
+    return mrb.build();
+  }
+
+  private static ContainerReportsProto createContainerReport() {
+    return ContainerReportsProto.newBuilder().build();
+  }
+
+  private static PipelineReportsProto createPipelineReport() {
+    return PipelineReportsProto.newBuilder().build();
+  }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org