You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/07/25 01:32:12 UTC

[29/50] hadoop git commit: HDDS-249. Fail if multiple SCM IDs on the DataNode and add SCM ID check after version request. Contributed by Bharat Viswanadham.

HDDS-249. Fail if multiple SCM IDs on the DataNode and add SCM ID check after version request. Contributed by Bharat Viswanadham.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9fa9e301
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9fa9e301
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9fa9e301

Branch: refs/heads/HADOOP-15461
Commit: 9fa9e301b0471f38530a3cb596b00064436d311d
Parents: 993ec02
Author: Nanda kumar <na...@apache.org>
Authored: Sat Jul 21 18:46:31 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Sat Jul 21 18:46:31 2018 +0530

----------------------------------------------------------------------
 .../states/endpoint/VersionEndpointTask.java    | 27 ++++++++--
 .../container/common/utils/HddsVolumeUtil.java  | 56 ++++++++++++++++++++
 .../container/ozoneimpl/ContainerReader.java    | 22 +++++---
 .../container/ozoneimpl/OzoneContainer.java     |  2 +-
 .../ozone/container/common/ScmTestMock.java     | 27 +++++++++-
 .../common/TestDatanodeStateMachine.java        | 14 ++---
 .../ozone/container/common/TestEndPoint.java    | 49 +++++++++++++++++
 7 files changed, 177 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index d782b59..64e078d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -23,10 +23,14 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
@@ -37,6 +41,8 @@ import java.util.concurrent.Callable;
  */
 public class VersionEndpointTask implements
     Callable<EndpointStateMachine.EndPointStates> {
+  public static final Logger LOG = LoggerFactory.getLogger(VersionEndpointTask
+      .class);
   private final EndpointStateMachine rpcEndPoint;
   private final Configuration configuration;
   private final OzoneContainer ozoneContainer;
@@ -71,21 +77,32 @@ public class VersionEndpointTask implements
 
       Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
           "null");
-      Preconditions.checkNotNull(scmId, "Reply from SCM: clusterId cannot be" +
-          " null");
+      Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
+          "cannot be null");
 
       // If version file does not exist create version file and also set scmId
       for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
         HddsVolume hddsVolume = entry.getValue();
-        hddsVolume.format(clusterId);
-        ozoneContainer.getDispatcher().setScmId(scmId);
+        boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
+            clusterId, LOG);
+        if (!result) {
+          volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
+        }
       }
+      if (volumeSet.getVolumesList().size() == 0) {
+        // All volumes are inconsistent state
+        throw new DiskOutOfSpaceException("All configured Volumes are in " +
+            "Inconsistent State");
+      }
+      ozoneContainer.getDispatcher().setScmId(scmId);
 
       EndpointStateMachine.EndPointStates nextState =
           rpcEndPoint.getState().getNextState();
       rpcEndPoint.setState(nextState);
       rpcEndPoint.zeroMissedCount();
-    } catch (IOException ex) {
+    } catch (DiskOutOfSpaceException ex) {
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
+    } catch(IOException ex) {
       rpcEndPoint.logIfNeeded(ex);
     } finally {
       rpcEndPoint.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
index 5d6fc0a..bc0bd05 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -160,4 +162,58 @@ public final class HddsVolumeUtil {
     }
     return value;
   }
+
+  /**
+   * Check Volume is consistent state or not.
+   * @param hddsVolume
+   * @param scmId
+   * @param clusterId
+   * @param logger
+   * @return true - if volume is in consistent state, otherwise false.
+   */
+  public static boolean checkVolume(HddsVolume hddsVolume, String scmId, String
+      clusterId, Logger logger) {
+    File hddsRoot = hddsVolume.getHddsRootDir();
+    String volumeRoot = hddsRoot.getPath();
+    File scmDir = new File(hddsRoot, scmId);
+
+    try {
+      hddsVolume.format(clusterId);
+    } catch (IOException ex) {
+      logger.error("Error during formatting volume {}, exception is {}",
+          volumeRoot, ex);
+      return false;
+    }
+
+    File[] hddsFiles = hddsRoot.listFiles();
+
+    if(hddsFiles == null) {
+      // This is the case for IOException, where listFiles returns null.
+      // So, we fail the volume.
+      return false;
+    } else if (hddsFiles.length == 1) {
+      // DN started for first time or this is a newly added volume.
+      // So we create scm directory.
+      if (!scmDir.mkdir()) {
+        logger.error("Unable to create scmDir {}", scmDir);
+        return false;
+      }
+      return true;
+    } else if(hddsFiles.length == 2) {
+      // The files should be Version and SCM directory
+      if (scmDir.exists()) {
+        return true;
+      } else {
+        logger.error("Volume {} is in Inconsistent state, expected scm " +
+                "directory {} does not exist", volumeRoot, scmDir
+            .getAbsolutePath());
+        return false;
+      }
+    } else {
+      // The hdds root dir should always have 2 files. One is Version file
+      // and other is SCM directory.
+      return false;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 986aa16..c1595b2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
@@ -64,14 +65,16 @@ public class ContainerReader implements Runnable {
   private final ContainerSet containerSet;
   private final OzoneConfiguration config;
   private final File hddsVolumeDir;
+  private final VolumeSet volumeSet;
 
-  ContainerReader(HddsVolume volume, ContainerSet cset, OzoneConfiguration
-      conf) {
+  ContainerReader(VolumeSet volSet, HddsVolume volume, ContainerSet cset,
+                  OzoneConfiguration conf) {
     Preconditions.checkNotNull(volume);
     this.hddsVolume = volume;
     this.hddsVolumeDir = hddsVolume.getHddsRootDir();
     this.containerSet = cset;
     this.config = conf;
+    this.volumeSet = volSet;
   }
 
   @Override
@@ -97,10 +100,18 @@ public class ContainerReader implements Runnable {
     });
 
     if (scmDir == null) {
-      LOG.error("Volume {} is empty with out metadata and chunks",
+      LOG.error("IO error for the volume {}, skipped loading",
           hddsVolumeRootDir);
+      volumeSet.failVolume(hddsVolumeRootDir.getPath());
       return;
     }
+
+    if (scmDir.length > 1) {
+      LOG.error("Volume {} is in Inconsistent state", hddsVolumeRootDir);
+      volumeSet.failVolume(hddsVolumeRootDir.getPath());
+      return;
+    }
+
     for (File scmLoc : scmDir) {
       File currentDir = null;
       currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT);
@@ -123,9 +134,8 @@ public class ContainerReader implements Runnable {
                     verifyContainerFile(containerName, containerFile,
                         checksumFile);
                   } else {
-                    LOG.error(
-                        "Missing container metadata files for Container: " +
-                            "{}", containerName);
+                    LOG.error("Missing container metadata files for " +
+                        "Container: {}", containerName);
                   }
                 } else {
                   LOG.error("Missing container metadata directory for " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 8c3a0a2..8f067d9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -106,7 +106,7 @@ public class OzoneContainer {
     while (volumeSetIterator.hasNext()) {
       HddsVolume volume = volumeSetIterator.next();
       File hddsVolumeRootDir = volume.getHddsRootDir();
-      Thread thread = new Thread(new ContainerReader(volume,
+      Thread thread = new Thread(new ContainerReader(volumeSet, volume,
           containerSet, config));
       thread.start();
       volumeThreads.add(thread);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index fb8e7c1..8827d1d 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -56,6 +56,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   private AtomicInteger heartbeatCount = new AtomicInteger(0);
   private AtomicInteger rpcCount = new AtomicInteger(0);
   private AtomicInteger containerReportsCount = new AtomicInteger(0);
+  private String clusterId;
+  private String scmId;
+
+  public ScmTestMock() {
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+  }
 
   // Map of datanode to containers
   private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
@@ -157,8 +164,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
     return VersionResponse.newBuilder()
         .setVersion(versionInfo.getVersion())
         .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription())
-        .addValue(OzoneConsts.SCM_ID, UUID.randomUUID().toString())
-        .addValue(OzoneConsts.CLUSTER_ID, UUID.randomUUID().toString())
+        .addValue(OzoneConsts.SCM_ID, scmId)
+        .addValue(OzoneConsts.CLUSTER_ID, clusterId)
         .build().getProtobufMessage();
 
   }
@@ -329,4 +336,20 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   public void addScmCommandRequest(SCMCommandProto scmCmd) {
     scmCommandRequests.add(scmCmd);
   }
+
+  /**
+   * Set scmId.
+   * @param id
+   */
+  public void setScmId(String id) {
+    this.scmId = id;
+  }
+
+  /**
+   * Set scmId.
+   * @return scmId
+   */
+  public String getScmId() {
+    return scmId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index ece7545..59029db 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.ipc.RPC;
@@ -57,9 +58,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -68,7 +69,9 @@ import static org.junit.Assert.assertTrue;
 public class TestDatanodeStateMachine {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestDatanodeStateMachine.class);
-  private final int scmServerCount = 3;
+  // Changing it to 1, as current code checks for multiple scm directories,
+  // and fail if exists
+  private final int scmServerCount = 1;
   private List<String> serverAddresses;
   private List<RPC.Server> scmServers;
   private List<ScmTestMock> mockServers;
@@ -90,7 +93,6 @@ public class TestDatanodeStateMachine {
       String address = "127.0.0.1";
       serverAddresses.add(address + ":" + port);
       ScmTestMock mock = new ScmTestMock();
-
       scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock,
           new InetSocketAddress(address, port), 10));
       mockServers.add(mock);
@@ -107,7 +109,7 @@ public class TestDatanodeStateMachine {
     }
 
     File dataDir = new File(testRoot, "data");
-    conf.set(DFS_DATANODE_DATA_DIR_KEY, dataDir.getAbsolutePath());
+    conf.set(HDDS_DATANODE_DIR_KEY, dataDir.getAbsolutePath());
     if (!dataDir.mkdirs()) {
       LOG.info("Data dir create failed.");
     }
@@ -145,7 +147,7 @@ public class TestDatanodeStateMachine {
     } catch (Exception e) {
       //ignore all execption from the shutdown
     } finally {
-      testRoot.delete();
+      FileUtil.fullyDelete(testRoot);
     }
   }
 
@@ -162,7 +164,7 @@ public class TestDatanodeStateMachine {
       stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
-      GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
+      GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 1,
           1000, 30000);
 
       stateMachine.stopDaemon();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9fa9e301/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index be8bd87..6619d26 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -70,8 +70,10 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
     .RegisterEndpointTask;
 import org.apache.hadoop.ozone.container.common.states.endpoint
     .VersionEndpointTask;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
@@ -175,6 +177,53 @@ public class TestEndPoint {
   }
 
   @Test
+  public void testCheckVersionResponse() throws Exception {
+    OzoneConfiguration conf = SCMTestUtils.getConf();
+    try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
+        serverAddress, 1000)) {
+      GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+          .captureLogs(VersionEndpointTask.LOG);
+      OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
+          conf);
+      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+          conf, ozoneContainer);
+      EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+      // if version call worked the endpoint should automatically move to the
+      // next state.
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+          newState);
+
+      // Now rpcEndpoint should remember the version it got from SCM
+      Assert.assertNotNull(rpcEndPoint.getVersion());
+
+      // Now change server scmId, so datanode scmId  will be
+      // different from SCM server response scmId
+      String newScmId = UUID.randomUUID().toString();
+      scmServerImpl.setScmId(newScmId);
+      newState = versionTask.call();
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
+            newState);
+      List<HddsVolume> volumesList = ozoneContainer.getVolumeSet()
+          .getFailedVolumesList();
+      Assert.assertTrue(volumesList.size() == 1);
+      File expectedScmDir = new File(volumesList.get(0).getHddsRootDir(),
+          scmServerImpl.getScmId());
+      Assert.assertTrue(logCapturer.getOutput().contains("expected scm " +
+          "directory " + expectedScmDir.getAbsolutePath() + " does not " +
+          "exist"));
+      Assert.assertTrue(ozoneContainer.getVolumeSet().getVolumesList().size()
+          == 0);
+      Assert.assertTrue(ozoneContainer.getVolumeSet().getFailedVolumesList()
+          .size() == 1);
+
+    }
+  }
+
+
+
+  @Test
   /**
    * This test makes a call to end point where there is no SCM server. We
    * expect that versionTask should be able to handle it.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org