You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2021/06/02 21:16:12 UTC

[ozone] branch master updated: HDDS-4483. Datanodes should send last processed CRL sequence ID in heartbeats (#2285)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 095d09b  HDDS-4483. Datanodes should send last processed CRL sequence ID in heartbeats (#2285)
095d09b is described below

commit 095d09b088b24c2f74f01bd50c99aed2e1a9370e
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Wed Jun 2 14:15:55 2021 -0700

    HDDS-4483. Datanodes should send last processed CRL sequence ID in heartbeats (#2285)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  8 ++-
 .../common/src/main/resources/ozone-default.xml    |  8 +++
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  2 +-
 .../common/report/CRLStatusReportPublisher.java    | 82 ++++++++++++++++++++++
 .../common/report/ReportPublisherFactory.java      |  2 +
 .../common/statemachine/DatanodeStateMachine.java  | 21 ++++--
 .../common/statemachine/StateContext.java          | 21 +++++-
 .../container/common/TestDatanodeStateMachine.java | 13 ++--
 .../common/report/TestReportPublisher.java         | 47 +++++++++++++
 .../common/report/TestReportPublisherFactory.java  | 13 +++-
 .../common/statemachine/TestStateContext.java      | 70 +++++-------------
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto | 12 ++++
 .../ozone/container/common/TestEndPoint.java       |  2 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  8 +--
 14 files changed, 237 insertions(+), 72 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 62a6012..e05eda7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -34,10 +34,14 @@ public final class HddsConfigKeys {
       "hdds.container.report.interval";
   public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT =
       "60s";
+  public static final String HDDS_CRL_STATUS_REPORT_INTERVAL =
+      "hdds.crl.status.report.interval";
+  public static final String HDDS_CRL_STATUS_REPORT_INTERVAL_DEFAULT =
+      "60s";
   public static final String HDDS_PIPELINE_REPORT_INTERVAL =
-          "hdds.pipeline.report.interval";
+      "hdds.pipeline.report.interval";
   public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
-          "60s";
+      "60s";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2014aea..027681b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -316,6 +316,14 @@
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
   <property>
+    <name>hdds.crl.status.report.interval</name>
+    <value>60000ms</value>
+    <tag>OZONE, SECURITY, MANAGEMENT</tag>
+    <description>Time interval of the datanode to send CRL status report. Each
+      datanode periodically sends CRL status report to SCM. Unit could be
+      defined with postfix (ns,ms,s,m,h,d)</description>
+  </property>
+  <property>
     <name>hdds.pipeline.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, PIPELINE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index baa5889..1710090 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -253,7 +253,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
         initializeCertificateClient(conf);
       }
       datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
-          dnCertClient, this::terminateDatanode);
+          dnCertClient, this::terminateDatanode, dnCRLStore);
       try {
         httpServer = new HddsDatanodeHttpServer(conf);
         httpServer.start();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CRLStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CRLStatusReportPublisher.java
new file mode 100644
index 0000000..9b7cf9a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CRLStatusReportPublisher.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CRL_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CRL_STATUS_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Publishes CRLStatusReport which will be sent to SCM as part of heartbeat.
+ * CRLStatusReport consist of the following information:
+ *   - receivedCRLId : The latest processed CRL Sequence ID.
+ *   - pendingCRLIds : The list of CRL IDs that are still pending in the
+ *   queue to be processed in the future.
+ */
+public class CRLStatusReportPublisher extends
+    ReportPublisher<CRLStatusReport> {
+
+  private Long crlStatusReportInterval = null;
+
+  @Override
+  protected long getReportFrequency() {
+    if (crlStatusReportInterval == null) {
+      crlStatusReportInterval = getConf().getTimeDuration(
+          HDDS_CRL_STATUS_REPORT_INTERVAL,
+          HDDS_CRL_STATUS_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+
+      long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+          getConf());
+
+      Preconditions.checkState(
+          heartbeatFrequency <= crlStatusReportInterval,
+          HDDS_CRL_STATUS_REPORT_INTERVAL +
+              " cannot be configured lower than heartbeat frequency.");
+    }
+    return crlStatusReportInterval;
+  }
+
+  @Override
+  protected CRLStatusReport getReport() throws IOException {
+
+    CRLStatusReport.Builder builder = CRLStatusReport.newBuilder();
+
+    DatanodeCRLStore dnCRLStore = this.getContext().getParent().getDnCRLStore();
+
+    builder.setReceivedCrlId(dnCRLStore.getLatestCRLSequenceID());
+    if (dnCRLStore.getPendingCRLs().size() > 0) {
+      List<Long> pendingCRLIds =
+          dnCRLStore.getPendingCRLs().stream().map(CRLInfo::getCrlSequenceID)
+              .collect(Collectors.toList());
+      builder.addAllPendingCrlIds(pendingCRLIds);
+    }
+
+    return builder.build();
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index 4533691..82a3c41 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -53,6 +54,7 @@ public class ReportPublisherFactory {
         CommandStatusReportPublisher.class);
     report2publisher.put(PipelineReportsProto.class,
             PipelineReportPublisher.class);
+    report2publisher.put(CRLStatusReport.class, CRLStatusReportPublisher.class);
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 36ed863..544e4c5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -27,7 +27,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -73,11 +75,12 @@ public class DatanodeStateMachine implements Closeable {
   private final SCMConnectionManager connectionManager;
   private StateContext context;
   private final OzoneContainer container;
-  private DatanodeDetails datanodeDetails;
+  private final DatanodeCRLStore dnCRLStore;
+  private final DatanodeDetails datanodeDetails;
   private final CommandDispatcher commandDispatcher;
   private final ReportManager reportManager;
   private long commandsHandled;
-  private AtomicLong nextHB;
+  private final AtomicLong nextHB;
   private Thread stateMachineThread = null;
   private Thread cmdProcessThread = null;
   private final ReplicationSupervisor supervisor;
@@ -100,14 +103,17 @@ public class DatanodeStateMachine implements Closeable {
    *                     enabled
    */
   public DatanodeStateMachine(DatanodeDetails datanodeDetails,
-      ConfigurationSource conf, CertificateClient certClient,
-      HddsDatanodeStopService hddsDatanodeStopService) throws IOException {
+                              ConfigurationSource conf,
+                              CertificateClient certClient,
+                              HddsDatanodeStopService hddsDatanodeStopService,
+                              DatanodeCRLStore crlStore) throws IOException {
     DatanodeConfiguration dnConf =
         conf.getObject(DatanodeConfiguration.class);
 
     this.hddsDatanodeStopService = hddsDatanodeStopService;
     this.conf = conf;
     this.datanodeDetails = datanodeDetails;
+    this.dnCRLStore = crlStore;
     executorService = Executors.newFixedThreadPool(
         getEndPointTaskThreadPoolSize(),
         new ThreadFactoryBuilder()
@@ -162,6 +168,7 @@ public class DatanodeStateMachine implements Closeable {
         .addPublisherFor(ContainerReportsProto.class)
         .addPublisherFor(CommandStatusReportsProto.class)
         .addPublisherFor(PipelineReportsProto.class)
+        .addPublisherFor(CRLStatusReport.class)
         .build();
   }
 
@@ -210,6 +217,10 @@ public class DatanodeStateMachine implements Closeable {
     }
   }
 
+  public DatanodeCRLStore getDnCRLStore() {
+    return dnCRLStore;
+  }
+
   /**
    * Runs the state machine at a fixed frequency.
    */
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 3051638..5c3aaa9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -43,6 +43,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -89,6 +90,9 @@ public class StateContext {
   @VisibleForTesting
   static final String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME =
       IncrementalContainerReportProto.getDescriptor().getFullName();
+  @VisibleForTesting
+  static final String CRL_STATUS_REPORT_PROTO_NAME =
+      CRLStatusReport.getDescriptor().getFullName();
   // Accepted types of reports that can be queued to incrementalReportsQueue
   private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
       Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
@@ -107,6 +111,7 @@ public class StateContext {
   private final AtomicReference<GeneratedMessage> containerReports;
   private final AtomicReference<GeneratedMessage> nodeReport;
   private final AtomicReference<GeneratedMessage> pipelineReports;
+  private final AtomicReference<GeneratedMessage> crlStatusReport;
   // Incremental reports are queued in the map below
   private final Map<InetSocketAddress, List<GeneratedMessage>>
       incrementalReportsQueue;
@@ -134,12 +139,12 @@ public class StateContext {
    * real HB frequency after scm registration. With this method the
    * initial registration could be significant faster.
    */
-  private AtomicLong heartbeatFrequency = new AtomicLong(2000);
+  private final AtomicLong heartbeatFrequency = new AtomicLong(2000);
 
   /**
    * Constructs a StateContext.
    *
-   * @param conf   - Configration
+   * @param conf   - Configuration
    * @param state  - State
    * @param parent Parent State Machine
    */
@@ -155,6 +160,7 @@ public class StateContext {
     containerReports = new AtomicReference<>();
     nodeReport = new AtomicReference<>();
     pipelineReports = new AtomicReference<>();
+    crlStatusReport = new AtomicReference<>();
     endpoints = new HashSet<>();
     containerActions = new HashMap<>();
     pipelineActions = new HashMap<>();
@@ -268,6 +274,8 @@ public class StateContext {
           incrementalReportsQueue.get(endpoint).add(report);
         }
       }
+    } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
+      crlStatusReport.set(report);
     } else {
       throw new IllegalArgumentException(
           "Unidentified report message type: " + reportType);
@@ -346,6 +354,10 @@ public class StateContext {
     if (report != null) {
       nonIncrementalReports.add(report);
     }
+    report = crlStatusReport.get();
+    if (report != null) {
+      nonIncrementalReports.add(report);
+    }
     return nonIncrementalReports;
   }
 
@@ -805,4 +817,9 @@ public class StateContext {
   public GeneratedMessage getPipelineReports() {
     return pipelineReports.get();
   }
+
+  @VisibleForTesting
+  public GeneratedMessage getCRLStatusReport() {
+    return crlStatusReport.get();
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 4743f54..82c9e6e 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -153,7 +153,8 @@ public class TestDatanodeStateMachine {
   public void testStartStopDatanodeStateMachine() throws IOException,
       InterruptedException, TimeoutException {
     try (DatanodeStateMachine stateMachine =
-        new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
+        new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null,
+            null)) {
       stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
@@ -216,7 +217,8 @@ public class TestDatanodeStateMachine {
     ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
+             new DatanodeStateMachine(datanodeDetails, conf, null, null,
+                 null)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -338,7 +340,8 @@ public class TestDatanodeStateMachine {
     datanodeDetails.setPort(port);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
+             new DatanodeStateMachine(datanodeDetails, conf, null, null,
+                 null)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -401,7 +404,7 @@ public class TestDatanodeStateMachine {
       perTestConf.setStrings(entry.getKey(), entry.getValue());
       LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
       try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-          getNewDatanodeDetails(), perTestConf, null, null)) {
+          getNewDatanodeDetails(), perTestConf, null, null, null)) {
         DatanodeStateMachine.DatanodeStates currentState =
             stateMachine.getContext().getState();
         Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 291ab6e..4b25609 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,17 +18,26 @@
 package org.apache.hadoop.ozone.container.common.report;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.GeneratedMessage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.HddsIdFactory;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -165,6 +174,44 @@ public class TestReportPublisher {
     executorService.shutdown();
   }
 
+  @Test
+  public void testCRLStatusReportPublisher() throws IOException {
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    DatanodeStateMachine dummyStateMachine =
+        Mockito.mock(DatanodeStateMachine.class);
+    ReportPublisher publisher = new CRLStatusReportPublisher();
+    DatanodeCRLStore dnCrlStore = Mockito.mock(DatanodeCRLStore.class);
+    when(dnCrlStore.getLatestCRLSequenceID()).thenReturn(3L);
+    List<CRLInfo> pendingCRLs = new ArrayList<>();
+    pendingCRLs.add(new CRLInfo.Builder()
+        .setCrlSequenceID(100L)
+        .build());
+    pendingCRLs.add(new CRLInfo.Builder()
+        .setCrlSequenceID(101L)
+        .build());
+    when(dnCrlStore.getPendingCRLs()).thenReturn(pendingCRLs);
+    when(dummyStateMachine.getDnCRLStore()).thenReturn(dnCrlStore);
+    when(dummyContext.getParent()).thenReturn(dummyStateMachine);
+    publisher.setConf(config);
+
+    ScheduledExecutorService executorService = HadoopExecutors
+        .newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Unit test ReportManager Thread - %d").build());
+    publisher.init(dummyContext, executorService);
+    GeneratedMessage report =
+        ((CRLStatusReportPublisher) publisher).getReport();
+    Assert.assertNotNull(report);
+    for(Descriptors.FieldDescriptor descriptor :
+        report.getDescriptorForType().getFields()) {
+      if (descriptor.getNumber() ==
+          CRLStatusReport.RECEIVEDCRLID_FIELD_NUMBER) {
+        Assert.assertEquals(3L, report.getField(descriptor));
+      }
+    }
+    executorService.shutdown();
+  }
+
   /**
    * Get a datanode details.
    *
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
index e9a34c7..ea915f9 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -56,6 +57,16 @@ public class TestReportPublisherFactory {
   }
 
   @Test
+  public void testGetCRLStatusReportPublisher() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+    ReportPublisher publisher = factory
+        .getPublisherFor(CRLStatusReport.class);
+    Assert.assertEquals(CRLStatusReportPublisher.class, publisher.getClass());
+    Assert.assertEquals(conf, publisher.getConf());
+  }
+
+  @Test
   public void testInvalidReportPublisher() {
     OzoneConfiguration conf = new OzoneConfiguration();
     ReportPublisherFactory factory = new ReportPublisherFactory(conf);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 046c433..d152f4d 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -126,6 +126,12 @@ public class TestStateContext {
       fail("Should throw exception when putting back unaccepted reports!");
     } catch (IllegalArgumentException ignored) {
     }
+    try {
+      ctx.putBackReports(Collections.singletonList(
+          newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1);
+      fail("Should throw exception when putting back unaccepted reports!");
+    } catch (IllegalArgumentException ignored) {
+    }
 
     // Case 3: Put back mixed types of incremental reports
 
@@ -153,7 +159,8 @@ public class TestStateContext {
       ctx.putBackReports(Arrays.asList(
           newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
           newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
-          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)
+          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME),
+          newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)
       ), scm1);
       fail("Should throw exception when putting back unaccepted reports!");
     } catch (IllegalArgumentException ignored) {
@@ -190,46 +197,6 @@ public class TestStateContext {
 
     Map<String, Integer> expectedReportCount = new HashMap<>();
 
-    // Add a bunch of ContainerReports
-    batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
-    // Should only keep the latest one
-    expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
-    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
-    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
-    // Add a bunch of NodeReport
-    batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
-    // Should only keep the latest one
-    expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
-    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
-    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
-    // Add a bunch of PipelineReports
-    batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
-    // Should only keep the latest one
-    expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
-    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
-    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
-    // Add a bunch of PipelineReports
-    batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
-    // Should only keep the latest one
-    expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
-    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
-    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
-    // Add a bunch of CommandStatusReports
-    batchAddReports(ctx,
-        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
-    expectedReportCount.put(
-        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
-    // Should keep all of them
-    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
-    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-    // getReports dequeues incremental reports
-    expectedReportCount.remove(
-        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
-
     // Add a bunch of IncrementalContainerReport
     batchAddReports(ctx,
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
@@ -321,12 +288,12 @@ public class TestStateContext {
   }
 
   private GeneratedMessage newMockReport(String messageType) {
-    GeneratedMessage pipelineReports = mock(GeneratedMessage.class);
-    when(pipelineReports.getDescriptorForType()).thenReturn(
+    GeneratedMessage report = mock(GeneratedMessage.class);
+    when(report.getDescriptorForType()).thenReturn(
         mock(Descriptor.class));
-    when(pipelineReports.getDescriptorForType().getFullName()).thenReturn(
+    when(report.getDescriptorForType().getFullName()).thenReturn(
         messageType);
-    return pipelineReports;
+    return report;
   }
 
   @Test
@@ -340,11 +307,8 @@ public class TestStateContext {
     InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
     InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
 
-    GeneratedMessage generatedMessage = mock(GeneratedMessage.class);
-    when(generatedMessage.getDescriptorForType()).thenReturn(
-        mock(Descriptor.class));
-    when(generatedMessage.getDescriptorForType().getFullName()).thenReturn(
-        "hadoop.hdds.CommandStatusReportsProto");
+    GeneratedMessage generatedMessage =
+        newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
 
     // Try to add report with zero endpoint. Should not be stored.
     stateContext.addReport(generatedMessage);
@@ -588,6 +552,7 @@ public class TestStateContext {
     batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
     batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
     batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+    batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
     batchAddReports(ctx,
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
 
@@ -595,9 +560,12 @@ public class TestStateContext {
     expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
     expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
     expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+    expectedReportCount.put(StateContext.CRL_STATUS_REPORT_PROTO_NAME, 1);
     // Should keep less or equal than maxLimit depending on other reports' size.
+    // Here, the incremental container reports count must be 96
+    // (100 - 4 non-incremental reports)
     expectedReportCount.put(
-        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 97);
+        StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 96);
     checkReportCount(ctx.getReports(scm1, 100), expectedReportCount);
     checkReportCount(ctx.getReports(scm2, 100), expectedReportCount);
   }
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index e28cc88..a6b5794 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -404,6 +404,18 @@ message SetNodeOperationalStateCommandProto {
   required  int64 stateExpiryEpochSeconds = 3;
 }
 
+message CRLStatusReport {
+  required int64 receivedCrlId=1;
+  repeated int64 pendingCrlIds=2;
+}
+
+/**
+ * This command asks the datanode to process a new CRL.
+ */
+message ProcessCRLCommandProto {
+  required CRLInfoProto crlInfo = 1;
+}
+
 /**
  * Protocol used from a datanode to StorageContainerManager.
  *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 9cc263d..5a211c6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -465,7 +465,7 @@ public class TestEndPoint {
 
     // Create a datanode state machine for stateConext used by endpoint task
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-        randomDatanodeDetails(), conf, null, null);
+        randomDatanodeDetails(), conf, null, null, null);
         EndpointStateMachine rpcEndPoint =
             createEndpoint(conf, scmAddress, rpcTimeout)) {
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index d49710d..8c51f9d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -221,7 +221,7 @@ public class TestMiniOzoneCluster {
 
       for (int i = 0; i < 3; i++) {
         stateMachines.add(new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf, null, null));
+            randomDatanodeDetails(), ozoneConf, null, null, null));
       }
 
       //we need to start all the servers to get the fix ports
@@ -266,11 +266,11 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
     try (
         DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf,  null, null);
+            randomDatanodeDetails(), ozoneConf,  null, null, null);
         DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf,  null, null);
+            randomDatanodeDetails(), ozoneConf,  null, null, null);
         DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf,  null, null);
+            randomDatanodeDetails(), ozoneConf,  null, null, null);
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
       assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org