You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2010/10/05 02:56:22 UTC
svn commit: r1004492 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Author: suresh
Date: Tue Oct 5 00:56:22 2010
New Revision: 1004492
URL: http://svn.apache.org/viewvc?rev=1004492&view=rev
Log:
HDFS-1434. Refactor Datanode#startDataNode method into smaller methods. Contributed by Suresh Srinivas.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1004492&r1=1004491&r2=1004492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Oct 5 00:56:22 2010
@@ -158,6 +158,9 @@ Trunk (unreleased changes)
HDFS-1368. Add a block counter to DatanodeDescriptor. (hairong)
+ HDFS-1434. Refactor Datanode#startDataNode method into smaller methods.
+ (suresh)
+
BUG FIXES
HDFS-1039. Adding test for JspHelper.getUGI(jnp via boryas)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1004492&r1=1004491&r2=1004492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Oct 5 00:56:22 2010
@@ -285,26 +285,7 @@ public class DataNode extends Configured
}
}
- /**
- * This method starts the data node with the specified conf.
- *
- * @param conf - the configuration
- * if conf's CONFIG_PROPERTY_SIMULATED property is set
- * then a simulated storage based data node is created.
- *
- * @param dataDirs - only for a non-simulated storage data node
- * @throws IOException
- */
- void startDataNode(Configuration conf,
- AbstractList<File> dataDirs,
- DatanodeProtocol namenode, SecureResources resources
- ) throws IOException {
- if(UserGroupInformation.isSecurityEnabled() && resources == null)
- throw new RuntimeException("Cannot start secure cluster without " +
- "privileged resources.");
-
- this.secureResources = resources;
-
+ private void initConfig(Configuration conf) throws UnknownHostException {
// use configured nameserver & interface to get local hostname
if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
@@ -327,70 +308,6 @@ public class DataNode extends Configured
true);
this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
- InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
- int tmpPort = socAddr.getPort();
- storage = new DataStorage();
- // construct registration
- this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
-
- // connect to name node
- this.namenode = namenode;
-
- // get version and id info from the name-node
- NamespaceInfo nsInfo = handshake();
- StartupOption startOpt = getStartupOption(conf);
- assert startOpt != null : "Startup option must be set.";
-
- boolean simulatedFSDataset =
- conf.getBoolean("dfs.datanode.simulateddatastorage", false);
- if (simulatedFSDataset) {
- setNewStorageID(dnRegistration);
- dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
- dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
- // it would have been better to pass storage as a parameter to
- // constructor below - need to augment ReflectionUtils used below.
- conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
- try {
- //Equivalent of following (can't do because Simulated is in test dir)
- // this.data = new SimulatedFSDataset(conf);
- this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
- Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
- } catch (ClassNotFoundException e) {
- throw new IOException(StringUtils.stringifyException(e));
- }
- } else { // real storage
- // read storage info, lock data dirs and transition fs state if necessary
- storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
- // adjust
- this.dnRegistration.setStorageInfo(storage);
- // initialize data node internal structure
- this.data = new FSDataset(storage, conf);
- }
-
- // register datanode MXBean
- registerMXBean();
-
- // find free port or use privileged port provide
- ServerSocket ss;
- if(secureResources == null) {
- ss = (socketWriteTimeout > 0) ?
- ServerSocketChannel.open().socket() : new ServerSocket();
- Server.bind(ss, socAddr, 0);
- } else {
- ss = resources.getStreamingSocket();
- }
- ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
- // adjust machine name with the actual port
- tmpPort = ss.getLocalPort();
- selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
- tmpPort);
- this.dnRegistration.setName(machineName + ":" + tmpPort);
- LOG.info("Opened info server at " + tmpPort);
-
- this.threadGroup = new ThreadGroup("dataXceiverServer");
- this.dataXceiverServer = new Daemon(threadGroup,
- new DataXceiverServer(ss, conf, this));
- this.threadGroup.setDaemon(true); // auto destroy when empty
this.blockReportInterval =
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
@@ -402,22 +319,10 @@ public class DataNode extends Configured
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
}
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
-
- //initialize periodic block scanner
- String reason = null;
- if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
- reason = "verification is turned off by configuration";
- } else if ( !(data instanceof FSDataset) ) {
- reason = "verifcation is supported only with FSDataset";
- }
- if ( reason == null ) {
- blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
- } else {
- LOG.info("Periodic Block Verification is disabled because " +
- reason + ".");
- }
-
- //create a servlet to serve full-file content
+ }
+
+ private void startInfoServer(Configuration conf) throws IOException {
+ // create a servlet to serve full-file content
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
@@ -453,19 +358,65 @@ public class DataNode extends Configured
this.infoServer.start();
// adjust info port
this.dnRegistration.setInfoPort(this.infoServer.getPort());
- myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
+ }
+
+ private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
+ throws IOException {
+ // get version and id info from the name-node
+ NamespaceInfo nsInfo = handshake();
- // BlockTokenSecretManager is created here, but it shouldn't be
- // used until it is initialized in register().
- this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+ StartupOption startOpt = getStartupOption(conf);
+ assert startOpt != null : "Startup option must be set.";
- //init ipc server
+
+ boolean simulatedFSDataset =
+ conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+ if (simulatedFSDataset) {
+ setNewStorageID(dnRegistration);
+ dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
+ dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
+ // it would have been better to pass storage as a parameter to
+ // constructor below - need to augment ReflectionUtils used below.
+ conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
+ try {
+ //Equivalent of following (can't do because Simulated is in test dir)
+ // this.data = new SimulatedFSDataset(conf);
+ this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
+ Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ } else { // real storage
+ // read storage info, lock data dirs and transition fs state if necessary
+ storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
+ // adjust
+ this.dnRegistration.setStorageInfo(storage);
+ // initialize data node internal structure
+ this.data = new FSDataset(storage, conf);
+ }
+ }
+
+
+ private void startPlugins(Configuration conf) {
+ plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
+ for (ServicePlugin p: plugins) {
+ try {
+ p.start(this);
+ LOG.info("Started plug-in " + p);
+ } catch (Throwable t) {
+ LOG.warn("ServicePlugin " + p + " could not be started", t);
+ }
+ }
+ }
+
+
+ private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
conf.get("dfs.datanode.ipc.address"));
ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false,
conf, blockTokenSecretManager);
-
+
// set service-level authorization security policy
if (conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
@@ -473,19 +424,93 @@ public class DataNode extends Configured
}
dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
-
LOG.info("dnRegistration = " + dnRegistration);
-
- plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
- for (ServicePlugin p: plugins) {
- try {
- p.start(this);
- LOG.info("Started plug-in " + p);
- } catch (Throwable t) {
- LOG.warn("ServicePlugin " + p + " could not be started", t);
- }
+ }
+
+
+ private void initBlockScanner(Configuration conf) {
+ String reason = null;
+ if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
+ reason = "verification is turned off by configuration";
+ } else if ( !(data instanceof FSDataset) ) {
+ reason = "verifcation is supported only with FSDataset";
+ }
+ if ( reason == null ) {
+ blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
+ } else {
+ LOG.info("Periodic Block Verification is disabled because " +
+ reason + ".");
}
}
+
+ private void initDataXceiver(Configuration conf) throws IOException {
+ // construct registration
+ InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
+ int tmpPort = socAddr.getPort();
+ this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
+
+ // find free port or use privileged port provided
+ ServerSocket ss;
+ if(secureResources == null) {
+ ss = (socketWriteTimeout > 0) ?
+ ServerSocketChannel.open().socket() : new ServerSocket();
+ Server.bind(ss, socAddr, 0);
+ } else {
+ ss = secureResources.getStreamingSocket();
+ }
+ ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+ // adjust machine name with the actual port
+ tmpPort = ss.getLocalPort();
+ selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
+ tmpPort);
+ this.dnRegistration.setName(machineName + ":" + tmpPort);
+ LOG.info("Opened info server at " + tmpPort);
+
+ this.threadGroup = new ThreadGroup("dataXceiverServer");
+ this.dataXceiverServer = new Daemon(threadGroup,
+ new DataXceiverServer(ss, conf, this));
+ this.threadGroup.setDaemon(true); // auto destroy when empty
+ }
+
+ /**
+ * This method starts the data node with the specified conf.
+ *
+ * @param conf - the configuration
+ * if conf's CONFIG_PROPERTY_SIMULATED property is set
+ * then a simulated storage based data node is created.
+ *
+ * @param dataDirs - only for a non-simulated storage data node
+ * @throws IOException
+ */
+ void startDataNode(Configuration conf,
+ AbstractList<File> dataDirs,
+ DatanodeProtocol namenode, SecureResources resources
+ ) throws IOException {
+ if(UserGroupInformation.isSecurityEnabled() && resources == null)
+ throw new RuntimeException("Cannot start secure cluster without " +
+ "privileged resources.");
+
+ this.secureResources = resources;
+ this.namenode = namenode;
+ storage = new DataStorage();
+
+ initConfig(conf);
+ registerMXBean();
+ initDataXceiver(conf);
+ initFsDataSet(conf, dataDirs);
+ initBlockScanner(conf);
+ startInfoServer(conf);
+
+ myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
+ // TODO check what code removed here
+
+ initIpcServer(conf);
+ startPlugins(conf);
+
+ // BlockTokenSecretManager is created here, but it shouldn't be
+ // used until it is initialized in register().
+ this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+ }
/**
* Determine the http server's effective addr