You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/10/05 02:56:22 UTC

svn commit: r1004492 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Author: suresh
Date: Tue Oct  5 00:56:22 2010
New Revision: 1004492

URL: http://svn.apache.org/viewvc?rev=1004492&view=rev
Log:
HDFS-1434. Refactor Datanode#startDataNode method into smaller methods. Contributed by Suresh Srinivas.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1004492&r1=1004491&r2=1004492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Oct  5 00:56:22 2010
@@ -158,6 +158,9 @@ Trunk (unreleased changes)
 
     HDFS-1368. Add a block counter to DatanodeDescriptor. (hairong)
 
+    HDFS-1434. Refactor Datanode#startDataNode method into smaller methods.
+    (suresh)
+
   BUG FIXES
 
     HDFS-1039. Adding test for  JspHelper.getUGI(jnp via boryas)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1004492&r1=1004491&r2=1004492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Oct  5 00:56:22 2010
@@ -285,26 +285,7 @@ public class DataNode extends Configured
    }
   }
 
-  /**
-   * This method starts the data node with the specified conf.
-   * 
-   * @param conf - the configuration
-   *  if conf's CONFIG_PROPERTY_SIMULATED property is set
-   *  then a simulated storage based data node is created.
-   * 
-   * @param dataDirs - only for a non-simulated storage data node
-   * @throws IOException
-   */
-  void startDataNode(Configuration conf, 
-                     AbstractList<File> dataDirs,
-                     DatanodeProtocol namenode, SecureResources resources
-                     ) throws IOException {
-    if(UserGroupInformation.isSecurityEnabled() && resources == null)
-      throw new RuntimeException("Cannot start secure cluster without " +
-      "privileged resources.");
-
-    this.secureResources = resources;
-    
+  private void initConfig(Configuration conf) throws UnknownHostException {
     // use configured nameserver & interface to get local hostname
     if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
       machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);   
@@ -327,70 +308,6 @@ public class DataNode extends Configured
                                              true);
     this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                                        DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-    InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
-    int tmpPort = socAddr.getPort();
-    storage = new DataStorage();
-    // construct registration
-    this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
-
-    // connect to name node
-    this.namenode = namenode;
-
-    // get version and id info from the name-node
-    NamespaceInfo nsInfo = handshake();
-    StartupOption startOpt = getStartupOption(conf);
-    assert startOpt != null : "Startup option must be set.";
-    
-    boolean simulatedFSDataset = 
-        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
-    if (simulatedFSDataset) {
-        setNewStorageID(dnRegistration);
-        dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
-        dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
-        // it would have been better to pass storage as a parameter to
-        // constructor below - need to augment ReflectionUtils used below.
-        conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
-        try {
-          //Equivalent of following (can't do because Simulated is in test dir)
-          //  this.data = new SimulatedFSDataset(conf);
-          this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
-              Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
-        } catch (ClassNotFoundException e) {
-          throw new IOException(StringUtils.stringifyException(e));
-        }
-    } else { // real storage
-      // read storage info, lock data dirs and transition fs state if necessary
-      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-      // adjust
-      this.dnRegistration.setStorageInfo(storage);
-      // initialize data node internal structure
-      this.data = new FSDataset(storage, conf);
-    }
-
-    // register datanode MXBean
-    registerMXBean();
-      
-    // find free port or use privileged port provide
-    ServerSocket ss;
-    if(secureResources == null) {
-      ss = (socketWriteTimeout > 0) ? 
-          ServerSocketChannel.open().socket() : new ServerSocket();
-          Server.bind(ss, socAddr, 0);
-    } else {
-      ss = resources.getStreamingSocket();
-    }
-    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
-    // adjust machine name with the actual port
-    tmpPort = ss.getLocalPort();
-    selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
-                                     tmpPort);
-    this.dnRegistration.setName(machineName + ":" + tmpPort);
-    LOG.info("Opened info server at " + tmpPort);
-      
-    this.threadGroup = new ThreadGroup("dataXceiverServer");
-    this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(ss, conf, this));
-    this.threadGroup.setDaemon(true); // auto destroy when empty
 
     this.blockReportInterval =
       conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
@@ -402,22 +319,10 @@ public class DataNode extends Configured
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
-
-    //initialize periodic block scanner
-    String reason = null;
-    if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
-      reason = "verification is turned off by configuration";
-    } else if ( !(data instanceof FSDataset) ) {
-      reason = "verifcation is supported only with FSDataset";
-    } 
-    if ( reason == null ) {
-      blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
-    } else {
-      LOG.info("Periodic Block Verification is disabled because " +
-               reason + ".");
-    }
-
-    //create a servlet to serve full-file content
+  }
+  
+  private void startInfoServer(Configuration conf) throws IOException {
+    // create a servlet to serve full-file content
     InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
     String infoHost = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
@@ -453,19 +358,65 @@ public class DataNode extends Configured
     this.infoServer.start();
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
-    myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
+  }
+  
+  private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
+      throws IOException {
+    // get version and id info from the name-node
+    NamespaceInfo nsInfo = handshake();
 
-    // BlockTokenSecretManager is created here, but it shouldn't be
-    // used until it is initialized in register().
-    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+    StartupOption startOpt = getStartupOption(conf);
+    assert startOpt != null : "Startup option must be set.";
     
-    //init ipc server
+
+    boolean simulatedFSDataset = 
+        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+    if (simulatedFSDataset) {
+        setNewStorageID(dnRegistration);
+        dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
+        dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
+        // it would have been better to pass storage as a parameter to
+        // constructor below - need to augment ReflectionUtils used below.
+        conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
+        try {
+          //Equivalent of following (can't do because Simulated is in test dir)
+          //  this.data = new SimulatedFSDataset(conf);
+          this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
+              Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
+        } catch (ClassNotFoundException e) {
+          throw new IOException(StringUtils.stringifyException(e));
+        }
+    } else { // real storage
+      // read storage info, lock data dirs and transition fs state if necessary
+      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
+      // adjust
+      this.dnRegistration.setStorageInfo(storage);
+      // initialize data node internal structure
+      this.data = new FSDataset(storage, conf);
+    }
+  }
+  
+
+  private void startPlugins(Configuration conf) {
+    plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
+    for (ServicePlugin p: plugins) {
+      try {
+        p.start(this);
+        LOG.info("Started plug-in " + p);
+      } catch (Throwable t) {
+        LOG.warn("ServicePlugin " + p + " could not be started", t);
+      }
+    }
+  }
+  
+
+  private void initIpcServer(Configuration conf) throws IOException {
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));
     ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
         ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false,
         conf, blockTokenSecretManager);
-
+    
     // set service-level authorization security policy
     if (conf.getBoolean(
         CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
@@ -473,19 +424,93 @@ public class DataNode extends Configured
     }
 
     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
-
     LOG.info("dnRegistration = " + dnRegistration);
-    
-    plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
-    for (ServicePlugin p: plugins) {
-      try {
-        p.start(this);
-        LOG.info("Started plug-in " + p);
-      } catch (Throwable t) {
-        LOG.warn("ServicePlugin " + p + " could not be started", t);
-      }
+  }
+  
+
+  private void initBlockScanner(Configuration conf) {
+    String reason = null;
+    if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
+      reason = "verification is turned off by configuration";
+    } else if ( !(data instanceof FSDataset) ) {
+      reason = "verifcation is supported only with FSDataset";
+    } 
+    if ( reason == null ) {
+      blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
+    } else {
+      LOG.info("Periodic Block Verification is disabled because " +
+               reason + ".");
     }
   }
+  
+  private void initDataXceiver(Configuration conf) throws IOException {
+    // construct registration
+    InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
+    int tmpPort = socAddr.getPort();
+    this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
+
+    // find free port or use privileged port provided
+    ServerSocket ss;
+    if(secureResources == null) {
+      ss = (socketWriteTimeout > 0) ? 
+          ServerSocketChannel.open().socket() : new ServerSocket();
+          Server.bind(ss, socAddr, 0);
+    } else {
+      ss = secureResources.getStreamingSocket();
+    }
+    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
+    // adjust machine name with the actual port
+    tmpPort = ss.getLocalPort();
+    selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
+                                     tmpPort);
+    this.dnRegistration.setName(machineName + ":" + tmpPort);
+    LOG.info("Opened info server at " + tmpPort);
+      
+    this.threadGroup = new ThreadGroup("dataXceiverServer");
+    this.dataXceiverServer = new Daemon(threadGroup, 
+        new DataXceiverServer(ss, conf, this));
+    this.threadGroup.setDaemon(true); // auto destroy when empty
+  }
+  
+  /**
+   * This method starts the data node with the specified conf.
+   * 
+   * @param conf - the configuration
+   *  if conf's CONFIG_PROPERTY_SIMULATED property is set
+   *  then a simulated storage based data node is created.
+   * 
+   * @param dataDirs - only for a non-simulated storage data node
+   * @throws IOException
+   */
+  void startDataNode(Configuration conf, 
+                     AbstractList<File> dataDirs,
+                     DatanodeProtocol namenode, SecureResources resources
+                     ) throws IOException {
+    if(UserGroupInformation.isSecurityEnabled() && resources == null)
+      throw new RuntimeException("Cannot start secure cluster without " +
+      "privileged resources.");
+
+    this.secureResources = resources;
+    this.namenode = namenode;
+    storage = new DataStorage();
+    
+    initConfig(conf);
+    registerMXBean();
+    initDataXceiver(conf);
+    initFsDataSet(conf, dataDirs);
+    initBlockScanner(conf);
+    startInfoServer(conf);
+  
+    myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
+    // TODO check what code removed here
+
+    initIpcServer(conf);
+    startPlugins(conf);
+    
+    // BlockTokenSecretManager is created here, but it shouldn't be
+    // used until it is initialized in register().
+    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+  }
 
   /**
    * Determine the http server's effective addr