You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2019/04/26 21:25:44 UTC

[hadoop] branch trunk updated: HDDS-1456. Stop the datanode, when any datanode statemachine state is… (#769)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43b2a4b  HDDS-1456. Stop the datanode, when any datanode statemachine state is… (#769)
43b2a4b is described below

commit 43b2a4b77bfdd7dec66c92bf59a70f0aca437722
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Fri Apr 26 14:25:34 2019 -0700

    HDDS-1456. Stop the datanode, when any datanode statemachine state is… (#769)
---
 .../apache/hadoop/ozone/HddsDatanodeService.java   | 46 +++++++++++++---------
 .../hadoop/ozone/HddsDatanodeStopService.java      | 27 +++++++++++++
 .../common/statemachine/DatanodeStateMachine.java  | 14 ++++++-
 .../common/statemachine/StateContext.java          | 25 ++++++++++++
 .../ozone/container/common/volume/VolumeSet.java   | 10 +++--
 .../container/common/TestDatanodeStateMachine.java |  8 ++--
 .../common/volume/TestVolumeSetDiskChecks.java     |  9 ++++-
 .../ozone/container/common/TestEndPoint.java       |  6 ++-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  | 12 +++---
 9 files changed, 121 insertions(+), 36 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index df2e167..b41da83 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -56,6 +56,7 @@ import java.security.cert.CertificateException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
 import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
@@ -84,6 +85,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
   private HddsDatanodeHttpServer httpServer;
   private boolean printBanner;
   private String[] args;
+  private volatile AtomicBoolean isStopped = new AtomicBoolean(false);
 
   public HddsDatanodeService(boolean printBanner, String[] args) {
     this.printBanner = printBanner;
@@ -209,7 +211,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
           initializeCertificateClient(conf);
         }
         datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
-            dnCertClient);
+            dnCertClient, this::terminateDatanode);
         try {
           httpServer = new HddsDatanodeHttpServer(conf);
           httpServer.start();
@@ -421,29 +423,37 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
     }
   }
 
+  public void terminateDatanode() {
+    stop();
+    terminate(1);
+  }
+
+
   @Override
   public void stop() {
-    if (plugins != null) {
-      for (ServicePlugin plugin : plugins) {
-        try {
-          plugin.stop();
-          LOG.info("Stopped plug-in {}", plugin);
-        } catch (Throwable t) {
-          LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
+    if (!isStopped.get()) {
+      isStopped.set(true);
+      if (plugins != null) {
+        for (ServicePlugin plugin : plugins) {
+          try {
+            plugin.stop();
+            LOG.info("Stopped plug-in {}", plugin);
+          } catch (Throwable t) {
+            LOG.warn("ServicePlugin {} could not be stopped", plugin, t);
+          }
         }
       }
-    }
-    if (datanodeStateMachine != null) {
-      datanodeStateMachine.stopDaemon();
-    }
-    if (httpServer != null) {
-      try {
-        httpServer.stop();
-      } catch (Exception e) {
-        LOG.error("Stopping HttpServer is failed.", e);
+      if (datanodeStateMachine != null) {
+        datanodeStateMachine.stopDaemon();
+      }
+      if (httpServer != null) {
+        try {
+          httpServer.stop();
+        } catch (Exception e) {
+          LOG.error("Stopping HttpServer is failed.", e);
+        }
       }
     }
-
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeStopService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeStopService.java
new file mode 100644
index 0000000..02c1431
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeStopService.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+/**
+ * Interface which declares a method to stop HddsDatanodeService.
+ */
+public interface HddsDatanodeStopService {
+
+  void stopService();
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 69782ef..0119d23 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.ozone.HddsDatanodeStopService;
 import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
@@ -84,6 +85,7 @@ public class DatanodeStateMachine implements Closeable {
 
   private JvmPauseMonitor jvmPauseMonitor;
   private CertificateClient dnCertClient;
+  private final HddsDatanodeStopService hddsDatanodeStopService;
 
   /**
    * Constructs a a datanode state machine.
@@ -93,7 +95,9 @@ public class DatanodeStateMachine implements Closeable {
    *                     enabled
    */
   public DatanodeStateMachine(DatanodeDetails datanodeDetails,
-      Configuration conf, CertificateClient certClient) throws IOException {
+      Configuration conf, CertificateClient certClient,
+      HddsDatanodeStopService hddsDatanodeStopService) throws IOException {
+    this.hddsDatanodeStopService = hddsDatanodeStopService;
     this.conf = conf;
     this.datanodeDetails = datanodeDetails;
     executorService = HadoopExecutors.newCachedThreadPool(
@@ -195,6 +199,14 @@ public class DatanodeStateMachine implements Closeable {
         LOG.error("Unable to finish the execution.", e);
       }
     }
+
+    // If we have got some exception in stateMachine we set the state to
+    // shutdown to stop the stateMachine thread. Along with this we should
+    // also stop the datanode.
+    if (context.getShutdownOnError()) {
+      LOG.error("DatanodeStateMachine Shutdown due to an critical error");
+      hddsDatanodeStopService.stopService();
+    }
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 56151f8..2c01f3a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -73,6 +73,7 @@ public class StateContext {
   private final Queue<ContainerAction> containerActions;
   private final Queue<PipelineAction> pipelineActions;
   private DatanodeStateMachine.DatanodeStates state;
+  private boolean shutdownOnError = false;
 
   /**
    * Starting with a 2 sec heartbeat frequency which will be updated to the
@@ -153,6 +154,22 @@ public class StateContext {
   }
 
   /**
+   * Sets the shutdownOnError. This method needs to be called when we
+   * set DatanodeState to SHUTDOWN when executing a task of a DatanodeState.
+   * @param value
+   */
+  private void setShutdownOnError(boolean value) {
+    this.shutdownOnError = value;
+  }
+
+  /**
+   * Get shutdownStateMachine.
+   * @return boolean
+   */
+  public boolean getShutdownOnError() {
+    return shutdownOnError;
+  }
+  /**
    * Adds the report to report queue.
    *
    * @param report report to be added
@@ -367,6 +384,14 @@ public class StateContext {
         }
         this.setState(newState);
       }
+
+      if (this.state == DatanodeStateMachine.DatanodeStates.SHUTDOWN) {
+        LOG.error("Critical error occurred in StateMachine, setting " +
+            "shutDownMachine");
+        // When some exception occurred, set shutdownStateMachine to true, so
+        // that we can terminate the datanode.
+        setShutdownOnError(true);
+      }
     }
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index 6fba4fb..875e96a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -167,8 +167,6 @@ public class VolumeSet {
 
         checkAndSetClusterID(hddsVolume.getClusterID());
 
-        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
-        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
         LOG.info("Added Volume : {} to VolumeSet",
             hddsVolume.getHddsRootDir().getPath());
 
@@ -177,6 +175,8 @@ public class VolumeSet {
           throw new IOException("Failed to create HDDS storage dir " +
               hddsVolume.getHddsRootDir());
         }
+        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
+        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
       } catch (IOException e) {
         HddsVolume volume = new HddsVolume.Builder(locationString)
             .failedVolume(true).build();
@@ -185,12 +185,14 @@ public class VolumeSet {
       }
     }
 
-    checkAllVolumes();
-
+    // First checking if we have any volumes, if all volumes are failed the
+    // volumeMap size will be zero, and we throw Exception.
     if (volumeMap.size() == 0) {
       throw new DiskOutOfSpaceException("No storage locations configured");
     }
 
+    checkAllVolumes();
+
     // Ensure volume threads are stopped and scm df is saved during shutdown.
     shutdownHook = () -> {
       saveVolumeSetUsed();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index e9c5e3e..9840f01 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -160,7 +160,7 @@ public class TestDatanodeStateMachine {
   public void testStartStopDatanodeStateMachine() throws IOException,
       InterruptedException, TimeoutException {
     try (DatanodeStateMachine stateMachine =
-        new DatanodeStateMachine(getNewDatanodeDetails(), conf, null)) {
+        new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
       stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
@@ -222,7 +222,7 @@ public class TestDatanodeStateMachine {
     ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf, null)) {
+             new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -343,7 +343,7 @@ public class TestDatanodeStateMachine {
     datanodeDetails.setPort(port);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf, null)) {
+             new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -406,7 +406,7 @@ public class TestDatanodeStateMachine {
       perTestConf.setStrings(entry.getKey(), entry.getValue());
       LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
       try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-          getNewDatanodeDetails(), perTestConf, null)) {
+          getNewDatanodeDetails(), perTestConf, null, null)) {
         DatanodeStateMachine.DatanodeStates currentState =
             stateMachine.getContext().getState();
         Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index 5cb218c..472bb98 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -38,6 +38,8 @@ import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.hamcrest.CoreMatchers.is;
 import org.junit.After;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import org.junit.Rule;
@@ -125,14 +127,14 @@ public class TestVolumeSetDiskChecks {
   }
 
   /**
-   * Verify that initialization fails if all volumes are bad.
+   * Verify that all volumes are added to fail list if all volumes are bad.
    */
   @Test
   public void testAllVolumesAreBad() throws IOException {
     final int numVolumes = 5;
 
     conf = getConfWithDataNodeDirs(numVolumes);
-    thrown.expect(IOException.class);
+
     final VolumeSet volumeSet = new VolumeSet(
         UUID.randomUUID().toString(), conf) {
       @Override
@@ -141,6 +143,9 @@ public class TestVolumeSetDiskChecks {
         return new DummyChecker(configuration, new Timer(), numVolumes);
       }
     };
+
+    assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
+    assertEquals(volumeSet.getVolumesList().size(), 0);
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 4b03474..6b493ed 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -175,6 +175,10 @@ public class TestEndPoint {
   @Test
   public void testCheckVersionResponse() throws Exception {
     OzoneConfiguration conf = SCMTestUtils.getConf();
+    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+        true);
+    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
+        true);
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
       GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
@@ -478,7 +482,7 @@ public class TestEndPoint {
 
     // Create a datanode state machine for stateConext used by endpoint task
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-        TestUtils.randomDatanodeDetails(), conf, null);
+        TestUtils.randomDatanodeDetails(), conf, null, null);
          EndpointStateMachine rpcEndPoint =
             createEndpoint(conf, scmAddress, rpcTimeout)) {
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index cd2c381..a07457a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -175,11 +175,11 @@ public class TestMiniOzoneCluster {
         true);
     try (
         DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
         DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
         DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null)
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null)
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
       assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
@@ -198,11 +198,11 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
     try (
         DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
         DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
         DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf,  null)
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null, null);
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
       assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org