You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2021/06/02 21:16:12 UTC
[ozone] branch master updated: HDDS-4483. Datanodes should send
last processed CRL sequence ID in heartbeats (#2285)
This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 095d09b HDDS-4483. Datanodes should send last processed CRL sequence ID in heartbeats (#2285)
095d09b is described below
commit 095d09b088b24c2f74f01bd50c99aed2e1a9370e
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Wed Jun 2 14:15:55 2021 -0700
HDDS-4483. Datanodes should send last processed CRL sequence ID in heartbeats (#2285)
---
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 8 ++-
.../common/src/main/resources/ozone-default.xml | 8 +++
.../apache/hadoop/ozone/HddsDatanodeService.java | 2 +-
.../common/report/CRLStatusReportPublisher.java | 82 ++++++++++++++++++++++
.../common/report/ReportPublisherFactory.java | 2 +
.../common/statemachine/DatanodeStateMachine.java | 21 ++++--
.../common/statemachine/StateContext.java | 21 +++++-
.../container/common/TestDatanodeStateMachine.java | 13 ++--
.../common/report/TestReportPublisher.java | 47 +++++++++++++
.../common/report/TestReportPublisherFactory.java | 13 +++-
.../common/statemachine/TestStateContext.java | 70 +++++-------------
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 12 ++++
.../ozone/container/common/TestEndPoint.java | 2 +-
.../apache/hadoop/ozone/TestMiniOzoneCluster.java | 8 +--
14 files changed, 237 insertions(+), 72 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 62a6012..e05eda7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -34,10 +34,14 @@ public final class HddsConfigKeys {
"hdds.container.report.interval";
public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT =
"60s";
+ public static final String HDDS_CRL_STATUS_REPORT_INTERVAL =
+ "hdds.crl.status.report.interval";
+ public static final String HDDS_CRL_STATUS_REPORT_INTERVAL_DEFAULT =
+ "60s";
public static final String HDDS_PIPELINE_REPORT_INTERVAL =
- "hdds.pipeline.report.interval";
+ "hdds.pipeline.report.interval";
public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
- "60s";
+ "60s";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
"hdds.command.status.report.interval";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2014aea..027681b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -316,6 +316,14 @@
defined with postfix (ns,ms,s,m,h,d)</description>
</property>
<property>
+ <name>hdds.crl.status.report.interval</name>
+ <value>60000ms</value>
+ <tag>OZONE, SECURITY, MANAGEMENT</tag>
+ <description>Time interval of the datanode to send CRL status report. Each
+ datanode periodically sends CRL status report to SCM. Unit could be
+ defined with postfix (ns,ms,s,m,h,d)</description>
+ </property>
+ <property>
<name>hdds.pipeline.report.interval</name>
<value>60000ms</value>
<tag>OZONE, PIPELINE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index baa5889..1710090 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -253,7 +253,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
initializeCertificateClient(conf);
}
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
- dnCertClient, this::terminateDatanode);
+ dnCertClient, this::terminateDatanode, dnCRLStore);
try {
httpServer = new HddsDatanodeHttpServer(conf);
httpServer.start();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CRLStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CRLStatusReportPublisher.java
new file mode 100644
index 0000000..9b7cf9a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CRLStatusReportPublisher.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CRL_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CRL_STATUS_REPORT_INTERVAL_DEFAULT;
+
+/**
+ * Publishes CRLStatusReport which will be sent to SCM as part of heartbeat.
+ * CRLStatusReport consist of the following information:
+ * - receivedCRLId : The latest processed CRL Sequence ID.
+ * - pendingCRLIds : The list of CRL IDs that are still pending in the
+ * queue to be processed in the future.
+ */
+public class CRLStatusReportPublisher extends
+ ReportPublisher<CRLStatusReport> {
+
+ private Long crlStatusReportInterval = null;
+
+ @Override
+ protected long getReportFrequency() {
+ if (crlStatusReportInterval == null) {
+ crlStatusReportInterval = getConf().getTimeDuration(
+ HDDS_CRL_STATUS_REPORT_INTERVAL,
+ HDDS_CRL_STATUS_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+ getConf());
+
+ Preconditions.checkState(
+ heartbeatFrequency <= crlStatusReportInterval,
+ HDDS_CRL_STATUS_REPORT_INTERVAL +
+ " cannot be configured lower than heartbeat frequency.");
+ }
+ return crlStatusReportInterval;
+ }
+
+ @Override
+ protected CRLStatusReport getReport() throws IOException {
+
+ CRLStatusReport.Builder builder = CRLStatusReport.newBuilder();
+
+ DatanodeCRLStore dnCRLStore = this.getContext().getParent().getDnCRLStore();
+
+ builder.setReceivedCrlId(dnCRLStore.getLatestCRLSequenceID());
+ if (dnCRLStore.getPendingCRLs().size() > 0) {
+ List<Long> pendingCRLIds =
+ dnCRLStore.getPendingCRLs().stream().map(CRLInfo::getCrlSequenceID)
+ .collect(Collectors.toList());
+ builder.addAllPendingCrlIds(pendingCRLIds);
+ }
+
+ return builder.build();
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index 4533691..82a3c41 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -53,6 +54,7 @@ public class ReportPublisherFactory {
CommandStatusReportPublisher.class);
report2publisher.put(PipelineReportsProto.class,
PipelineReportPublisher.class);
+ report2publisher.put(CRLStatusReport.class, CRLStatusReportPublisher.class);
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 36ed863..544e4c5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@@ -27,7 +27,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -73,11 +75,12 @@ public class DatanodeStateMachine implements Closeable {
private final SCMConnectionManager connectionManager;
private StateContext context;
private final OzoneContainer container;
- private DatanodeDetails datanodeDetails;
+ private final DatanodeCRLStore dnCRLStore;
+ private final DatanodeDetails datanodeDetails;
private final CommandDispatcher commandDispatcher;
private final ReportManager reportManager;
private long commandsHandled;
- private AtomicLong nextHB;
+ private final AtomicLong nextHB;
private Thread stateMachineThread = null;
private Thread cmdProcessThread = null;
private final ReplicationSupervisor supervisor;
@@ -100,14 +103,17 @@ public class DatanodeStateMachine implements Closeable {
* enabled
*/
public DatanodeStateMachine(DatanodeDetails datanodeDetails,
- ConfigurationSource conf, CertificateClient certClient,
- HddsDatanodeStopService hddsDatanodeStopService) throws IOException {
+ ConfigurationSource conf,
+ CertificateClient certClient,
+ HddsDatanodeStopService hddsDatanodeStopService,
+ DatanodeCRLStore crlStore) throws IOException {
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
this.hddsDatanodeStopService = hddsDatanodeStopService;
this.conf = conf;
this.datanodeDetails = datanodeDetails;
+ this.dnCRLStore = crlStore;
executorService = Executors.newFixedThreadPool(
getEndPointTaskThreadPoolSize(),
new ThreadFactoryBuilder()
@@ -162,6 +168,7 @@ public class DatanodeStateMachine implements Closeable {
.addPublisherFor(ContainerReportsProto.class)
.addPublisherFor(CommandStatusReportsProto.class)
.addPublisherFor(PipelineReportsProto.class)
+ .addPublisherFor(CRLStatusReport.class)
.build();
}
@@ -210,6 +217,10 @@ public class DatanodeStateMachine implements Closeable {
}
}
+ public DatanodeCRLStore getDnCRLStore() {
+ return dnCRLStore;
+ }
+
/**
* Runs the state machine at a fixed frequency.
*/
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 3051638..5c3aaa9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -43,6 +43,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -89,6 +90,9 @@ public class StateContext {
@VisibleForTesting
static final String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME =
IncrementalContainerReportProto.getDescriptor().getFullName();
+ @VisibleForTesting
+ static final String CRL_STATUS_REPORT_PROTO_NAME =
+ CRLStatusReport.getDescriptor().getFullName();
// Accepted types of reports that can be queued to incrementalReportsQueue
private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
@@ -107,6 +111,7 @@ public class StateContext {
private final AtomicReference<GeneratedMessage> containerReports;
private final AtomicReference<GeneratedMessage> nodeReport;
private final AtomicReference<GeneratedMessage> pipelineReports;
+ private final AtomicReference<GeneratedMessage> crlStatusReport;
// Incremental reports are queued in the map below
private final Map<InetSocketAddress, List<GeneratedMessage>>
incrementalReportsQueue;
@@ -134,12 +139,12 @@ public class StateContext {
* real HB frequency after scm registration. With this method the
* initial registration could be significant faster.
*/
- private AtomicLong heartbeatFrequency = new AtomicLong(2000);
+ private final AtomicLong heartbeatFrequency = new AtomicLong(2000);
/**
* Constructs a StateContext.
*
- * @param conf - Configration
+ * @param conf - Configuration
* @param state - State
* @param parent Parent State Machine
*/
@@ -155,6 +160,7 @@ public class StateContext {
containerReports = new AtomicReference<>();
nodeReport = new AtomicReference<>();
pipelineReports = new AtomicReference<>();
+ crlStatusReport = new AtomicReference<>();
endpoints = new HashSet<>();
containerActions = new HashMap<>();
pipelineActions = new HashMap<>();
@@ -268,6 +274,8 @@ public class StateContext {
incrementalReportsQueue.get(endpoint).add(report);
}
}
+ } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
+ crlStatusReport.set(report);
} else {
throw new IllegalArgumentException(
"Unidentified report message type: " + reportType);
@@ -346,6 +354,10 @@ public class StateContext {
if (report != null) {
nonIncrementalReports.add(report);
}
+ report = crlStatusReport.get();
+ if (report != null) {
+ nonIncrementalReports.add(report);
+ }
return nonIncrementalReports;
}
@@ -805,4 +817,9 @@ public class StateContext {
public GeneratedMessage getPipelineReports() {
return pipelineReports.get();
}
+
+ @VisibleForTesting
+ public GeneratedMessage getCRLStatusReport() {
+ return crlStatusReport.get();
+ }
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 4743f54..82c9e6e 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@@ -153,7 +153,8 @@ public class TestDatanodeStateMachine {
public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine =
- new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
+ new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null,
+ null)) {
stateMachine.startDaemon();
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
@@ -216,7 +217,8 @@ public class TestDatanodeStateMachine {
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
try (DatanodeStateMachine stateMachine =
- new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
+ new DatanodeStateMachine(datanodeDetails, conf, null, null,
+ null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -338,7 +340,8 @@ public class TestDatanodeStateMachine {
datanodeDetails.setPort(port);
try (DatanodeStateMachine stateMachine =
- new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
+ new DatanodeStateMachine(datanodeDetails, conf, null, null,
+ null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -401,7 +404,7 @@ public class TestDatanodeStateMachine {
perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
- getNewDatanodeDetails(), perTestConf, null, null)) {
+ getNewDatanodeDetails(), perTestConf, null, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 291ab6e..4b25609 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,17 +18,26 @@
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -165,6 +174,44 @@ public class TestReportPublisher {
executorService.shutdown();
}
+ @Test
+ public void testCRLStatusReportPublisher() throws IOException {
+ StateContext dummyContext = Mockito.mock(StateContext.class);
+ DatanodeStateMachine dummyStateMachine =
+ Mockito.mock(DatanodeStateMachine.class);
+ ReportPublisher publisher = new CRLStatusReportPublisher();
+ DatanodeCRLStore dnCrlStore = Mockito.mock(DatanodeCRLStore.class);
+ when(dnCrlStore.getLatestCRLSequenceID()).thenReturn(3L);
+ List<CRLInfo> pendingCRLs = new ArrayList<>();
+ pendingCRLs.add(new CRLInfo.Builder()
+ .setCrlSequenceID(100L)
+ .build());
+ pendingCRLs.add(new CRLInfo.Builder()
+ .setCrlSequenceID(101L)
+ .build());
+ when(dnCrlStore.getPendingCRLs()).thenReturn(pendingCRLs);
+ when(dummyStateMachine.getDnCRLStore()).thenReturn(dnCrlStore);
+ when(dummyContext.getParent()).thenReturn(dummyStateMachine);
+ publisher.setConf(config);
+
+ ScheduledExecutorService executorService = HadoopExecutors
+ .newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Unit test ReportManager Thread - %d").build());
+ publisher.init(dummyContext, executorService);
+ GeneratedMessage report =
+ ((CRLStatusReportPublisher) publisher).getReport();
+ Assert.assertNotNull(report);
+ for(Descriptors.FieldDescriptor descriptor :
+ report.getDescriptorForType().getFields()) {
+ if (descriptor.getNumber() ==
+ CRLStatusReport.RECEIVEDCRLID_FIELD_NUMBER) {
+ Assert.assertEquals(3L, report.getField(descriptor));
+ }
+ }
+ executorService.shutdown();
+ }
+
/**
* Get a datanode details.
*
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
index e9a34c7..ea915f9 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
import org.junit.Assert;
import org.junit.Rule;
@@ -56,6 +57,16 @@ public class TestReportPublisherFactory {
}
@Test
+ public void testGetCRLStatusReportPublisher() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+ ReportPublisher publisher = factory
+ .getPublisherFor(CRLStatusReport.class);
+ Assert.assertEquals(CRLStatusReportPublisher.class, publisher.getClass());
+ Assert.assertEquals(conf, publisher.getConf());
+ }
+
+ @Test
public void testInvalidReportPublisher() {
OzoneConfiguration conf = new OzoneConfiguration();
ReportPublisherFactory factory = new ReportPublisherFactory(conf);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 046c433..d152f4d 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -126,6 +126,12 @@ public class TestStateContext {
fail("Should throw exception when putting back unaccepted reports!");
} catch (IllegalArgumentException ignored) {
}
+ try {
+ ctx.putBackReports(Collections.singletonList(
+ newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1);
+ fail("Should throw exception when putting back unaccepted reports!");
+ } catch (IllegalArgumentException ignored) {
+ }
// Case 3: Put back mixed types of incremental reports
@@ -153,7 +159,8 @@ public class TestStateContext {
ctx.putBackReports(Arrays.asList(
newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
- newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)
+ newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME),
+ newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)
), scm1);
fail("Should throw exception when putting back unaccepted reports!");
} catch (IllegalArgumentException ignored) {
@@ -190,46 +197,6 @@ public class TestStateContext {
Map<String, Integer> expectedReportCount = new HashMap<>();
- // Add a bunch of ContainerReports
- batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
- // Should only keep the latest one
- expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
- checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
- checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
- // Add a bunch of NodeReport
- batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
- // Should only keep the latest one
- expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
- checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
- checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
- // Add a bunch of PipelineReports
- batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
- // Should only keep the latest one
- expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
- checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
- checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
- // Add a bunch of PipelineReports
- batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
- // Should only keep the latest one
- expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
- checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
- checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
-
- // Add a bunch of CommandStatusReports
- batchAddReports(ctx,
- StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
- expectedReportCount.put(
- StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
- // Should keep all of them
- checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
- checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
- // getReports dequeues incremental reports
- expectedReportCount.remove(
- StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
-
// Add a bunch of IncrementalContainerReport
batchAddReports(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
@@ -321,12 +288,12 @@ public class TestStateContext {
}
private GeneratedMessage newMockReport(String messageType) {
- GeneratedMessage pipelineReports = mock(GeneratedMessage.class);
- when(pipelineReports.getDescriptorForType()).thenReturn(
+ GeneratedMessage report = mock(GeneratedMessage.class);
+ when(report.getDescriptorForType()).thenReturn(
mock(Descriptor.class));
- when(pipelineReports.getDescriptorForType().getFullName()).thenReturn(
+ when(report.getDescriptorForType().getFullName()).thenReturn(
messageType);
- return pipelineReports;
+ return report;
}
@Test
@@ -340,11 +307,8 @@ public class TestStateContext {
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
- GeneratedMessage generatedMessage = mock(GeneratedMessage.class);
- when(generatedMessage.getDescriptorForType()).thenReturn(
- mock(Descriptor.class));
- when(generatedMessage.getDescriptorForType().getFullName()).thenReturn(
- "hadoop.hdds.CommandStatusReportsProto");
+ GeneratedMessage generatedMessage =
+ newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
// Try to add report with zero endpoint. Should not be stored.
stateContext.addReport(generatedMessage);
@@ -588,6 +552,7 @@ public class TestStateContext {
batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+ batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
batchAddReports(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
@@ -595,9 +560,12 @@ public class TestStateContext {
expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+ expectedReportCount.put(StateContext.CRL_STATUS_REPORT_PROTO_NAME, 1);
// Should keep less or equal than maxLimit depending on other reports' size.
+ // Here, the incremental container reports count must be 96
+ // (100 - 4 non-incremental reports)
expectedReportCount.put(
- StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 97);
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 96);
checkReportCount(ctx.getReports(scm1, 100), expectedReportCount);
checkReportCount(ctx.getReports(scm2, 100), expectedReportCount);
}
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index e28cc88..a6b5794 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -404,6 +404,18 @@ message SetNodeOperationalStateCommandProto {
required int64 stateExpiryEpochSeconds = 3;
}
+message CRLStatusReport {
+ required int64 receivedCrlId=1;
+ repeated int64 pendingCrlIds=2;
+}
+
+/**
+ * This command asks the datanode to process a new CRL.
+ */
+message ProcessCRLCommandProto {
+ required CRLInfoProto crlInfo = 1;
+}
+
/**
* Protocol used from a datanode to StorageContainerManager.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 9cc263d..5a211c6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -465,7 +465,7 @@ public class TestEndPoint {
// Create a datanode state machine for stateConext used by endpoint task
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
- randomDatanodeDetails(), conf, null, null);
+ randomDatanodeDetails(), conf, null, null, null);
EndpointStateMachine rpcEndPoint =
createEndpoint(conf, scmAddress, rpcTimeout)) {
HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index d49710d..8c51f9d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -221,7 +221,7 @@ public class TestMiniOzoneCluster {
for (int i = 0; i < 3; i++) {
stateMachines.add(new DatanodeStateMachine(
- randomDatanodeDetails(), ozoneConf, null, null));
+ randomDatanodeDetails(), ozoneConf, null, null, null));
}
//we need to start all the servers to get the fix ports
@@ -266,11 +266,11 @@ public class TestMiniOzoneCluster {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
- randomDatanodeDetails(), ozoneConf, null, null);
+ randomDatanodeDetails(), ozoneConf, null, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
- randomDatanodeDetails(), ozoneConf, null, null);
+ randomDatanodeDetails(), ozoneConf, null, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
- randomDatanodeDetails(), ozoneConf, null, null);
+ randomDatanodeDetails(), ozoneConf, null, null, null);
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org