You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ri...@apache.org on 2023/02/06 19:18:55 UTC

[ozone] branch master updated: HDDS-7489. Create a Freon tool to simulate datanodes (#4126)

This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e682b8470 HDDS-7489. Create a Freon tool to simulate datanodes (#4126)
3e682b8470 is described below

commit 3e682b84709d881ee257ca9fd4c048987c62b5b1
Author: Duong Nguyen <du...@gmail.com>
AuthorDate: Mon Feb 6 11:18:26 2023 -0800

    HDDS-7489. Create a Freon tool to simulate datanodes (#4126)
---
 .../org/apache/hadoop/hdds/server/JsonUtils.java   |  49 +-
 .../ozone/freon/DatanodeSimulationState.java       | 375 +++++++++++++
 .../hadoop/ozone/freon/DatanodeSimulator.java      | 587 +++++++++++++++++++++
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |   3 +-
 4 files changed, 1004 insertions(+), 10 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
index b46e48d3d7..cad0f7ffc2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
@@ -5,25 +5,29 @@
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.hadoop.hdds.server;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.MappingIterator;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SequenceWriter;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -40,6 +44,7 @@ public final class JsonUtils {
   // before use.
   private static final ObjectMapper MAPPER;
   private static final ObjectWriter WRITER;
+
   static {
     MAPPER = new ObjectMapper()
         .setSerializationInclusion(JsonInclude.Include.NON_NULL)
@@ -84,4 +89,30 @@ public final class JsonUtils {
         .constructCollectionType(List.class, elementType);
     return MAPPER.readValue(str, type);
   }
+
+  /**
+   * Utility to sequentially write a large collection of items to a file.
+   */
+  public static <T> void writeToFile(Iterable<T> items, File file)
+      throws IOException {
+    ObjectWriter writer = MAPPER.writer();
+    try (SequenceWriter sequenceWriter = writer.writeValues(file)) {
+      sequenceWriter.init(true);
+      for (T item : items) {
+        sequenceWriter.write(item);
+      }
+    }
+  }
+
+  /**
+   * Utility to sequentially read a large collection of items from a file.
+   */
+  public static <T> List<T> readFromFile(File file, Class<T> itemType)
+      throws IOException {
+    ObjectReader reader = MAPPER.readerFor(itemType);
+    try (MappingIterator<T> mappingIterator = reader.readValues(file)) {
+      return mappingIterator.readAll();
+    }
+  }
+
 }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulationState.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulationState.java
new file mode 100644
index 0000000000..7cf3b48985
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulationState.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Encapsulates states of a simulated datanode instance.
+ */
+class DatanodeSimulationState {
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      DatanodeSimulationState.class);
+  public static final long CONTAINER_SIZE = (long) StorageUnit.GB.toBytes(5);
+  private int targetContainersCount;
+
+  private DatanodeDetails datanodeDetails;
+  private boolean isRegistered = false;
+  private long fullContainerReportDurationMs;
+  private Map<InetSocketAddress, EndpointState> endpointStates =
+      new HashMap<>();
+
+  private Set<String> pipelines = new HashSet<>();
+  private Map<Long, ContainerReplicaProto.State> containers =
+      new HashMap<>();
+
+
+  // indicate if this node is in read-only mode, no pipeline should be created.
+  private volatile boolean readOnly = false;
+
+  DatanodeSimulationState(DatanodeDetails datanodeDetails,
+                          long fullContainerReportDurationMs,
+                          List<InetSocketAddress> allEndpoints,
+                          int targetContainersCount) {
+    this();
+    this.datanodeDetails = datanodeDetails;
+    this.fullContainerReportDurationMs = fullContainerReportDurationMs;
+    initEndpointsState(allEndpoints);
+    this.targetContainersCount = targetContainersCount;
+  }
+
+  DatanodeSimulationState() {
+  }
+
+  void initEndpointsState(List<InetSocketAddress> allEndpoints) {
+    for (InetSocketAddress endpoint : allEndpoints) {
+      endpointStates.put(endpoint, new EndpointState());
+    }
+  }
+
+  public synchronized void ackHeartbeatResponse(
+      SCMHeartbeatResponseProto response) {
+    for (SCMCommandProto command : response.getCommandsList()) {
+      switch (command.getCommandType()) {
+      case createPipelineCommand:
+        StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto
+            pipelineCmd =
+            command.getCreatePipelineCommandProto();
+        if (!readOnly) {
+          pipelines.add(pipelineCmd.getPipelineID().getId());
+        } else {
+          LOGGER.info("Ignored pipeline creation for {}-{}",
+              pipelineCmd.getType(), pipelineCmd.getFactor());
+        }
+        break;
+      case closePipelineCommand:
+        pipelines.remove(
+            command.getClosePipelineCommandProto()
+                .getPipelineID().getId());
+        break;
+      case closeContainerCommand:
+        StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto
+            closeContainerCmd = command.getCloseContainerCommandProto();
+        closeContainer(closeContainerCmd.getContainerID());
+        break;
+      default:
+        LOGGER.debug("Ignored command: {}",
+            command.getCommandType());
+      }
+    }
+  }
+
+  public synchronized SCMHeartbeatRequestProto heartbeatRequest(
+      InetSocketAddress endpoint, LayoutVersionProto layoutInfo)
+      throws IOException {
+    SCMHeartbeatRequestProto.Builder builder =
+        SCMHeartbeatRequestProto.newBuilder()
+            .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+            .setDataNodeLayoutVersion(layoutInfo)
+            .setNodeReport(createNodeReport())
+            .setPipelineReports(createPipelineReport());
+
+    addContainerReport(endpoint, builder);
+
+    return builder.build();
+  }
+
+  private void addContainerReport(InetSocketAddress endpoint,
+                                  SCMHeartbeatRequestProto.Builder builder) {
+    EndpointState state = endpointStates.get(endpoint);
+    if (state.nextFullContainerReport.compareTo(Instant.now()) <= 0) {
+      builder.setContainerReport(createFullContainerReport());
+
+      // Every datanode will start with a full report, then the next full
+      // repport should be schedule randomly between 0 and the next true cycle
+      // to avoid peaks.
+      if (state.nextFullContainerReport == Instant.MIN) {
+        state.nextFullContainerReport = Instant.now().plusMillis(
+            RandomUtils.nextLong(1, fullContainerReportDurationMs));
+      } else {
+        state.nextFullContainerReport = Instant.now()
+            .plusMillis(fullContainerReportDurationMs);
+      }
+      state.icr.clear();
+    } else {
+      if (state.icr.getReportCount() > 0) {
+        builder.addIncrementalContainerReport(state.icr.build());
+        state.icr.clear();
+      }
+    }
+  }
+
+  ContainerReportsProto createFullContainerReport() {
+    ContainerReportsProto.Builder builder = ContainerReportsProto.newBuilder();
+    for (Map.Entry<Long, ContainerReplicaProto.State> entry :
+        containers.entrySet()) {
+      ContainerReplicaProto container =
+          ContainerReplicaProto.newBuilder()
+              .setContainerID(entry.getKey())
+              .setReadCount(10_000)
+              .setWriteCount(10_000)
+              .setReadBytes(10_000_000L)
+              .setWriteBytes(5_000_000_000L)
+              .setKeyCount(10_000)
+              .setUsed(5_000_000_000L)
+              .setState(entry.getValue())
+              .setBlockCommitSequenceId(1000)
+              .setOriginNodeId(datanodeDetails.getUuidString())
+              .setReplicaIndex(0)
+              .build();
+      builder.addReports(container);
+    }
+    return builder.build();
+  }
+
+  PipelineReportsProto createPipelineReport() {
+    PipelineReportsProto.Builder builder = PipelineReportsProto.newBuilder();
+    for (String pipelineId : pipelines) {
+      builder.addPipelineReport(
+          StorageContainerDatanodeProtocolProtos.PipelineReport.newBuilder()
+              .setPipelineID(HddsProtos.PipelineID
+                  .newBuilder().setId(pipelineId).build())
+              .setIsLeader(true).build());
+    }
+    return builder.build();
+  }
+
+  StorageContainerDatanodeProtocolProtos.NodeReportProto createNodeReport()
+      throws IOException {
+    long capacity = targetContainersCount * CONTAINER_SIZE;
+    long used = readOnly ? capacity :
+        CONTAINER_SIZE * containers.size();
+
+    StorageLocationReport storageLocationReport = StorageLocationReport
+        .newBuilder()
+        .setStorageLocation("/tmp/unreal_storage")
+        .setId("simulated-storage-volume")
+        .setCapacity(capacity)
+        .setScmUsed(used)
+        .setRemaining(capacity - used)
+        .setStorageType(StorageType.DEFAULT)
+        .build();
+
+    StorageLocationReport metaLocationReport = StorageLocationReport
+        .newBuilder()
+        .setStorageLocation("/tmp/unreal_metadata")
+        .setId("simulated-storage-volume")
+        .setCapacity((long) StorageUnit.GB.toBytes(100))
+        .setScmUsed((long) StorageUnit.GB.toBytes(50))
+        .setRemaining((long) StorageUnit.GB.toBytes(50))
+        .setStorageType(StorageType.DEFAULT)
+        .build();
+
+    return StorageContainerDatanodeProtocolProtos.NodeReportProto.newBuilder()
+        .addStorageReport(storageLocationReport.getProtoBufMessage())
+        .addMetadataStorageReport(
+            metaLocationReport.getMetadataProtoBufMessage())
+        .build();
+  }
+
+  public synchronized void newContainer(long containerId) {
+    containers.put(containerId, ContainerReplicaProto.State.OPEN);
+    for (EndpointState state : endpointStates.values()) {
+      state.icr.addReport(
+          ContainerReplicaProto.newBuilder()
+              .setContainerID(containerId)
+              .setReadCount(10_000)
+              .setWriteCount(10_000)
+              .setReadBytes(10_000_000L)
+              .setWriteBytes(5_000_000_000L)
+              .setKeyCount(10_000)
+              .setUsed(CONTAINER_SIZE)
+              .setState(ContainerReplicaProto.State.OPEN)
+              .setBlockCommitSequenceId(1000)
+              .setOriginNodeId(datanodeDetails.getUuidString())
+              .setReplicaIndex(0)
+              .build()
+      );
+    }
+  }
+
+  public synchronized void closeContainer(Long containerID) {
+    if (containers.containsKey(containerID)) {
+      containers.put(containerID, ContainerReplicaProto.State.CLOSED);
+      for (EndpointState state : endpointStates.values()) {
+        state.icr.addReport(
+            ContainerReplicaProto.newBuilder()
+                .setContainerID(containerID)
+                .setReadCount(10_000)
+                .setWriteCount(10_000)
+                .setReadBytes(10_000_000L)
+                .setWriteBytes(5_000_000_000L)
+                .setKeyCount(10_000)
+                .setUsed(5_000_000_000L)
+                .setState(ContainerReplicaProto.State.CLOSED)
+                .setBlockCommitSequenceId(1000)
+                .setOriginNodeId(datanodeDetails.getUuidString())
+                .setReplicaIndex(0)
+                .build()
+        );
+      }
+    } else {
+      LOGGER.error("Unrecognized closeContainerCommand");
+    }
+  }
+
+  @JsonSerialize(using = DatanodeDetailsSerializer.class)
+  @JsonDeserialize(using = DatanodeDeserializer.class)
+  public synchronized DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+  public synchronized void setDatanodeDetails(
+      DatanodeDetails datanodeDetails) {
+    this.datanodeDetails = datanodeDetails;
+  }
+
+  public synchronized Set<String> getPipelines() {
+    return pipelines;
+  }
+
+  public synchronized void setPipelines(Set<String> pipelines) {
+    this.pipelines = pipelines;
+  }
+
+  public synchronized boolean isRegistered() {
+    return isRegistered;
+  }
+
+  public synchronized void setRegistered(boolean registered) {
+    isRegistered = registered;
+  }
+
+  public synchronized Map<Long, ContainerReplicaProto.State> getContainers() {
+    return containers;
+  }
+
+  public synchronized void setContainers(
+      Map<Long, ContainerReplicaProto.State> containers) {
+    this.containers = containers;
+  }
+
+  public void setReadOnly(boolean readOnly) {
+    this.readOnly = readOnly;
+  }
+
+  public long getFullContainerReportDurationMs() {
+    return fullContainerReportDurationMs;
+  }
+
+  public void setFullContainerReportDurationMs(
+      long fullContainerReportDurationMs) {
+    this.fullContainerReportDurationMs = fullContainerReportDurationMs;
+  }
+
+  public int getTargetContainersCount() {
+    return targetContainersCount;
+  }
+
+  public void setTargetContainersCount(int targetContainersCount) {
+    this.targetContainersCount = targetContainersCount;
+  }
+
+  private static class DatanodeDetailsSerializer
+      extends StdSerializer<DatanodeDetails> {
+    protected DatanodeDetailsSerializer() {
+      super(DatanodeDetails.class);
+    }
+
+    @Override
+    public void serialize(DatanodeDetails value, JsonGenerator gen,
+                          SerializerProvider provider) throws IOException {
+      gen.writeBinary(value.getProtoBufMessage().toByteArray());
+    }
+  }
+
+  private static class DatanodeDeserializer
+      extends StdDeserializer<DatanodeDetails> {
+    protected DatanodeDeserializer() {
+      super(DatanodeDetails.class);
+    }
+
+    @Override
+    public DatanodeDetails deserialize(JsonParser p,
+                                       DeserializationContext ctxt)
+        throws IOException {
+      byte[] binaryValue = p.getBinaryValue();
+      return DatanodeDetails.getFromProtoBuf(
+          HddsProtos.DatanodeDetailsProto.parseFrom(binaryValue));
+    }
+  }
+
+  private static class EndpointState {
+    private final IncrementalContainerReportProto.Builder icr =
+        IncrementalContainerReportProto.newBuilder();
+
+    private Instant nextFullContainerReport = Instant.MIN;
+  }
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
new file mode 100644
index 0000000000..20800757b1
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeSimulator.java
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.DatanodeVersion;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.server.JsonUtils;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocolPB.ReconDatanodeProtocolPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.HddsUtils.getReconAddresses;
+import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmRpcRetryCount;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmRpcRetryInterval;
+import static org.apache.hadoop.hdds.utils.HddsVersionInfo.HDDS_VERSION_INFO;
+
+/**
+ * This command simulates a number of datanodes and coordinates with
+ * SCM to create a number of containers on the said datanodes.
+ * <p>
+ * This tool is created to verify the SCM/Recon ability to handle thousands
+ * datanodes and exabytes of container data.
+ * <p>
+ * Usage:
+ * ozone freon simulate-datanode -t 20 -n 5000 -c 40000
+ * -t: number of threads to run datanodes heartbeat.
+ * -n: number of data node to simulate.
+ * -c: number containers to simulate per datanode.
+ * <p>
+ * The simulation can be stopped and restored safely as datanode states,
+ * including pipelines and containers, are saved to a file when the process
+ * exits.
+ * <p>
+ * When the number containers exceeds the required one, all datanodes are
+ * transitioned to readonly mode (all pipelines are closed).
+ */
+@CommandLine.Command(name = "simulate-datanode",
+    description =
+        "Simulate one or many datanodes and register them to SCM." +
+            "This is used to stress test SCM handling a massive cluster.",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true,
+    showDefaultValues = true)
+public class DatanodeSimulator implements Callable<Void> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      DatanodeSimulator.class);
+
+  private Map<InetSocketAddress,
+      StorageContainerDatanodeProtocolClientSideTranslatorPB> scmClients;
+  private InetSocketAddress reconAddress;
+  private StorageContainerDatanodeProtocolClientSideTranslatorPB reconClient;
+
+  private ConfigurationSource conf;
+  private List<DatanodeSimulationState> datanodes;
+  private Map<UUID, DatanodeSimulationState> datanodesMap;
+
+  private ScheduledExecutorService heartbeatScheduler;
+  private LayoutVersionProto layoutInfo;
+
+  @CommandLine.ParentCommand
+  private Freon freonCommand;
+  @CommandLine.Option(names = {"-t", "--threads"},
+      description = "Size of the threadpool running heartbeat.",
+      defaultValue = "10")
+  private int threadCount = 10;
+  @CommandLine.Option(names = {"-n", "--nodes"},
+      description = "Number of simulated datanode instances.",
+      defaultValue = "1")
+  private int datanodesCount = 1;
+
+  @CommandLine.Option(names = {"-c", "--containers"},
+      description = "Number of simulated containers per datanode.",
+      defaultValue = "5")
+  private int containers = 1;
+
+  @CommandLine.Option(names = {"-r", "--reload"},
+      description = "Reload the datanodes created by previous simulation run.",
+      defaultValue = "true")
+  private boolean reload = true;
+
+  private Random random = new Random();
+
+  // stats
+  private AtomicLong totalHeartbeats = new AtomicLong(0);
+  private AtomicLong totalFCRs = new AtomicLong(0);
+  private AtomicLong totalICRs = new AtomicLong(0);
+  private StorageContainerLocationProtocol scmContainerClient;
+
+  @Override
+  public Void call() throws Exception {
+    init();
+    loadOrCreateDatanodes();
+
+    // Register datanodes to SCM/Recon and schedule heartbeat for each.
+    int successCount = 0;
+    for (DatanodeSimulationState dn : datanodes) {
+      successCount += startDatanode(dn) ? 1 : 0;
+    }
+
+    LOGGER.info("{} datanodes have been created and registered to SCM/Recon",
+        successCount);
+
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(() -> {
+          heartbeatScheduler.shutdown();
+          try {
+            heartbeatScheduler.awaitTermination(30, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          scmClients.values().forEach(IOUtils::closeQuietly);
+          IOUtils.closeQuietly(reconClient);
+          LOGGER.info("Successfully closed all the used resources");
+          saveDatanodesToFile();
+        })
+    );
+
+    backgroundStatsReporter();
+
+    // Starts creating containers and add them to simulated datanodes state
+    // as per SCM assignment.
+    // When this simulation is done on a cluster with some real datanodes,
+    // several containers will be assign to a mixture of read and simulated
+    // datanodes, resulting some containers are recorded as under-replicated.
+    // This should be fine though as those containers don't participate in any
+    // real operations, and the simulation is expected to run with
+    // a dominant number of simulated datanodes.
+    LOGGER.info("Start growing containers.");
+    try {
+      growContainers();
+    } catch (IOException e) {
+      LOGGER.error("Error creating containers, exiting.", e);
+      throw e;
+    }
+
+    // After reaching the number of expected containers, the simulated
+    // datanodes are moved to read-only states to avoid participating in
+    // the normal write operations that should only happen on real datanodes.
+    LOGGER.info("Finished creating container, " +
+        "transitioning datanodes to readonly");
+    moveDatanodesToReadonly();
+    LOGGER.info("All datanodes have been transitioned to read-only.");
+
+    return null;
+  }
+
+  private void moveDatanodesToReadonly() {
+    for (DatanodeSimulationState dn : datanodes) {
+      dn.setReadOnly(true);
+      for (String pipeline : dn.getPipelines()) {
+        try {
+          scmContainerClient.closePipeline(HddsProtos.PipelineID.newBuilder()
+              .setId(pipeline).build());
+        } catch (IOException e) {
+          LOGGER.error("Error closing pipeline {}", pipeline, e);
+        }
+      }
+    }
+  }
+
+  private void backgroundStatsReporter() {
+    long interval = getScmHeartbeatInterval(conf);
+    final AtomicLong lastTotalHeartbeats = new AtomicLong(0);
+    final AtomicLong lastTotalFCRs = new AtomicLong(0);
+    final AtomicLong lastTotalICRs = new AtomicLong(0);
+    final AtomicReference<Instant> lastCheck =
+        new AtomicReference<>(Instant.now());
+    heartbeatScheduler.scheduleAtFixedRate(() -> {
+
+      long heartbeats = totalHeartbeats.get() - lastTotalHeartbeats.get();
+      lastTotalHeartbeats.set(totalHeartbeats.get());
+      long fcrs = totalFCRs.get() - lastTotalFCRs.get();
+      lastTotalFCRs.set(totalFCRs.get());
+
+      long icrs = totalICRs.get() - lastTotalICRs.get();
+      lastTotalICRs.set(totalICRs.get());
+
+      long intervalInSeconds = Instant.now().getEpochSecond()
+          - lastCheck.get().getEpochSecond();
+      lastCheck.set(Instant.now());
+
+      LOGGER.info("Heartbeat status: \n" +
+              "Total heartbeat in cycle: {} ({} per second) \n" +
+              "Total incremental reported in cycle: {} ({} per second) \n" +
+              "Total full reported in cycle: {} ({} per second)",
+          heartbeats, heartbeats / intervalInSeconds,
+          icrs, icrs / intervalInSeconds,
+          fcrs, fcrs / intervalInSeconds);
+    }, interval, interval, TimeUnit.MILLISECONDS);
+  }
+
+  private void loadOrCreateDatanodes() throws UnknownHostException {
+    List<InetSocketAddress> allEndpoints = new LinkedList<>(
+        scmClients.keySet());
+    allEndpoints.add(reconAddress);
+
+    if (reload) {
+      datanodes = loadDatanodesFromFile();
+      for (DatanodeSimulationState datanode : datanodes) {
+        datanode.initEndpointsState(allEndpoints);
+      }
+    } else {
+      datanodes = new ArrayList<>(datanodesCount);
+    }
+
+    long containerReportInterval = conf.getTimeDuration(
+        HDDS_CONTAINER_REPORT_INTERVAL,
+        HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+
+    for (int i = datanodes.size(); i < datanodesCount; i++) {
+      datanodes.add(new DatanodeSimulationState(randomDatanodeDetails(conf),
+          containerReportInterval, allEndpoints, containers));
+    }
+
+    datanodesMap = new HashMap<>();
+    for (DatanodeSimulationState datanode : datanodes) {
+      datanodesMap.put(datanode.getDatanodeDetails().getUuid(), datanode);
+    }
+  }
+
+  /**
+   * Calls SCM to allocate new containers and add the containers to datanodes
+   * state according to the allocation response from SCM.
+   */
+  private void growContainers() throws IOException {
+    int totalAssignedContainers = 0;
+    for (DatanodeSimulationState datanode : datanodes) {
+      totalAssignedContainers += datanode.getContainers().size();
+    }
+
+    int totalExpectedContainers = datanodesCount * containers;
+    int totalCreatedContainers = 0;
+    while (totalAssignedContainers < totalExpectedContainers) {
+      ContainerWithPipeline cp =
+          scmContainerClient.allocateContainer(ReplicationType.RATIS,
+              ReplicationFactor.THREE, "test");
+
+      for (DatanodeDetails datanode : cp.getPipeline().getNodeSet()) {
+        if (datanodesMap.containsKey(datanode.getUuid())) {
+          datanodesMap.get(datanode.getUuid())
+              .newContainer(cp.getContainerInfo().getContainerID());
+          totalAssignedContainers++;
+        }
+      }
+
+      totalCreatedContainers++;
+      // closed immediately.
+      scmContainerClient.closeContainer(cp.getContainerInfo().getContainerID());
+    }
+
+    LOGGER.info("Finish assigning {} containers from {} created containers.",
+        totalAssignedContainers, totalCreatedContainers);
+  }
+
+  private boolean startDatanode(DatanodeSimulationState dn)
+      throws IOException {
+    if (!registerDataNode(dn)) {
+      LOGGER.info("Failed to register datanode to SCM: {}",
+          dn.getDatanodeDetails().getUuidString());
+      return false;
+    }
+
+    // Schedule heartbeat tasks for the given datanode to all SCMs/Recon.
+    long scmHeartbeatInterval = HddsServerUtil.getScmHeartbeatInterval(conf);
+    scmClients.forEach((endpoint, client) -> {
+      // Use random initial delay as a jitter to avoid peaks.
+      long initialDelay = RandomUtils.nextLong(0, scmHeartbeatInterval);
+      Runnable runnable = () -> heartbeat(endpoint, client, dn);
+      heartbeatScheduler.scheduleAtFixedRate(runnable, initialDelay,
+          scmHeartbeatInterval, TimeUnit.MILLISECONDS);
+    });
+
+    long reconHeartbeatInterval =
+        HddsServerUtil.getReconHeartbeatInterval(conf);
+    long initialDelay = RandomUtils.nextLong(0, reconHeartbeatInterval);
+    Runnable runnable = () -> heartbeat(reconAddress, reconClient, dn);
+    heartbeatScheduler.scheduleAtFixedRate(runnable, initialDelay,
+        reconHeartbeatInterval, TimeUnit.MILLISECONDS);
+
+    LOGGER.info("Successfully registered datanode to SCM: {}",
+        dn.getDatanodeDetails().getUuidString());
+    return true;
+  }
+
+  void saveDatanodesToFile() {
+    File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+    File file = new File(metaDirPath, "datanode-simulation.json");
+    try {
+      JsonUtils.writeToFile(datanodes, file);
+    } catch (IOException e) {
+      throw new RuntimeException("Error saving datanodes to file.", e);
+    }
+    LOGGER.info("{} datanodes has been saved to {}", datanodes.size(),
+        file.getAbsolutePath());
+  }
+
+  List<DatanodeSimulationState> loadDatanodesFromFile() {
+    File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+    File file = new File(metaDirPath, "datanode-simulation.json");
+    if (!file.exists()) {
+      LOGGER.info("File {} doesn't exists, nothing is loaded",
+          file.getAbsolutePath());
+      return new ArrayList<>();
+    }
+    try {
+      return JsonUtils.readFromFile(file, DatanodeSimulationState.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Error Reading datanodes from file.", e);
+    }
+  }
+
+  private void heartbeat(InetSocketAddress endpoint,
+                         StorageContainerDatanodeProtocol client,
+                         DatanodeSimulationState dn) {
+    try {
+      SCMHeartbeatRequestProto heartbeat = dn.heartbeatRequest(endpoint,
+          layoutInfo);
+      SCMHeartbeatResponseProto response = client.sendHeartbeat(heartbeat);
+      dn.ackHeartbeatResponse(response);
+
+      totalHeartbeats.incrementAndGet();
+      if (heartbeat.hasContainerReport()) {
+        totalFCRs.incrementAndGet();
+      } else {
+        totalICRs.addAndGet(heartbeat.getIncrementalContainerReportCount());
+      }
+
+      // If SCM requests this datanode to register, issue a reregister
+      // immediately.
+      if (response.getCommandsList().stream()
+          .anyMatch(x -> x.getCommandType() ==
+              SCMCommandProto.Type.reregisterCommand)) {
+        client.register(
+            dn.getDatanodeDetails().getExtendedProtoBufMessage(),
+            dn.createNodeReport(), dn.createFullContainerReport(),
+            dn.createPipelineReport(), this.layoutInfo);
+      }
+    } catch (Exception e) {
+      LOGGER.info("Error sending heartbeat for {}: {}",
+          dn.getDatanodeDetails().getUuidString(), e.getMessage(), e);
+    }
+  }
+
+  private void init() throws IOException {
+    conf = freonCommand.createOzoneConfiguration();
+    Collection<InetSocketAddress> addresses = getSCMAddressForDatanodes(conf);
+    scmClients = new HashMap<>(addresses.size());
+    for (InetSocketAddress address : addresses) {
+      scmClients.put(address, createScmClient(address));
+    }
+
+    reconAddress = getReconAddresses(conf);
+    reconClient = createReconClient(reconAddress);
+
+    heartbeatScheduler = Executors.newScheduledThreadPool(threadCount);
+
+    scmContainerClient = HAUtils.getScmContainerClient(conf);
+
+    this.layoutInfo = createLayoutInfo();
+  }
+
+  private LayoutVersionProto createLayoutInfo() throws IOException {
+    Storage layoutStorage = new DatanodeLayoutStorage(conf,
+        UUID.randomUUID().toString());
+
+    HDDSLayoutVersionManager layoutVersionManager =
+        new HDDSLayoutVersionManager(layoutStorage.getLayoutVersion());
+
+    return LayoutVersionProto.newBuilder()
+        .setMetadataLayoutVersion(
+            layoutVersionManager.getMetadataLayoutVersion())
+        .setSoftwareLayoutVersion(
+            layoutVersionManager.getSoftwareLayoutVersion())
+        .build();
+  }
+
+  private DatanodeDetails randomDatanodeDetails(ConfigurationSource config)
+      throws UnknownHostException {
+    DatanodeDetails details = DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID())
+        .build();
+    details.setInitialVersion(DatanodeVersion.CURRENT_VERSION);
+    details.setCurrentVersion(DatanodeVersion.CURRENT_VERSION);
+    details.setHostName(HddsUtils.getHostName(config));
+    details.setIpAddress(randomIp());
+    details.setPort(DatanodeDetails.Port.Name.STANDALONE, 0);
+    details.setPort(DatanodeDetails.Port.Name.RATIS, 0);
+    details.setPort(DatanodeDetails.Port.Name.REST, 0);
+    details.setVersion(HDDS_VERSION_INFO.getVersion());
+    details.setSetupTime(Time.now());
+    details.setRevision(HDDS_VERSION_INFO.getRevision());
+    details.setBuildDate(HDDS_VERSION_INFO.getDate());
+    details.setCurrentVersion(DatanodeVersion.CURRENT_VERSION);
+    return details;
+  }
+
+  private boolean registerDataNode(DatanodeSimulationState dn)
+      throws IOException {
+
+    ContainerReportsProto containerReports =
+        ContainerReportsProto.newBuilder().build();
+
+    NodeReportProto nodeReport = dn.createNodeReport();
+
+    PipelineReportsProto pipelineReports = PipelineReportsProto
+        .newBuilder().build();
+    boolean isRegistered = false;
+
+    for (StorageContainerDatanodeProtocol client : scmClients.values()) {
+      try {
+        SCMRegisteredResponseProto response =
+            client.register(
+                dn.getDatanodeDetails().getExtendedProtoBufMessage(),
+                nodeReport, containerReports, pipelineReports, this.layoutInfo);
+        if (response.hasHostname() && response.hasIpAddress()) {
+          dn.getDatanodeDetails().setHostName(response.getHostname());
+          dn.getDatanodeDetails().setIpAddress(response.getIpAddress());
+        }
+        if (response.hasNetworkName() && response.hasNetworkLocation()) {
+          dn.getDatanodeDetails().setNetworkName(response.getNetworkName());
+          dn.getDatanodeDetails()
+              .setNetworkLocation(response.getNetworkLocation());
+        }
+        isRegistered = isRegistered ||
+            (response.getErrorCode() ==
+                SCMRegisteredResponseProto.ErrorCode.success);
+      } catch (IOException e) {
+        LOGGER.error("Error register datanode to SCM", e);
+      }
+    }
+
+    try {
+      reconClient.register(dn.getDatanodeDetails().getExtendedProtoBufMessage(),
+          nodeReport, containerReports, pipelineReports, this.layoutInfo);
+    } catch (IOException e) {
+      LOGGER.error("Error register datanode to Recon", e);
+    }
+
+    dn.setRegistered(isRegistered);
+
+    return isRegistered;
+  }
+
+  private StorageContainerDatanodeProtocolClientSideTranslatorPB
+      createScmClient(InetSocketAddress address) throws IOException {
+
+    Configuration hadoopConfig =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+    RPC.setProtocolEngine(
+        hadoopConfig,
+        StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long version =
+        RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+            getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+            TimeUnit.MILLISECONDS);
+
+    StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+        StorageContainerDatanodeProtocolPB.class, version,
+        address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+        NetUtils.getDefaultSocketFactory(hadoopConfig),
+        getScmRpcTimeOutInMilliseconds(conf),
+        retryPolicy).getProxy();
+
+    return new StorageContainerDatanodeProtocolClientSideTranslatorPB(
+        rpcProxy);
+  }
+
+  private StorageContainerDatanodeProtocolClientSideTranslatorPB
+      createReconClient(InetSocketAddress address) throws IOException {
+    Configuration hadoopConfig =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+    RPC.setProtocolEngine(hadoopConfig, ReconDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long version =
+        RPC.getProtocolVersion(ReconDatanodeProtocolPB.class);
+
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+            getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+            TimeUnit.MILLISECONDS);
+    ReconDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+        ReconDatanodeProtocolPB.class, version,
+        address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+        NetUtils.getDefaultSocketFactory(hadoopConfig),
+        getScmRpcTimeOutInMilliseconds(conf),
+        retryPolicy).getProxy();
+
+    return new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
+  }
+
+  private String randomIp() {
+    return random.nextInt(256) + "." +
+        random.nextInt(256) + "." +
+        random.nextInt(256) + "." +
+        random.nextInt(256);
+  }
+
+  private static int getScmRpcTimeOutInMilliseconds(ConfigurationSource conf) {
+    return (int) conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
+        OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+  }
+
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index dfb52ca9ad..fcb9a5b27e 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -69,7 +69,8 @@ import picocli.CommandLine.Option;
         OmBucketReadWriteKeyOps.class,
         OmRPCLoadGenerator.class,
         OzoneClientKeyReadWriteOps.class,
-        RangeKeysGenerator.class
+        RangeKeysGenerator.class,
+        DatanodeSimulator.class
     },
     versionProvider = HddsVersionProvider.class,
     mixinStandardHelpOptions = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org