You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/02/01 19:40:25 UTC
[06/50] [abbrv] hadoop git commit: HDFS-9094. Add command line option
to ask NameNode reload configuration. (Contributed by Xiaobing Zhou)
HDFS-9094. Add command line option to ask NameNode reload configuration. (Contributed by Xiaobing Zhou)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d62b4a4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d62b4a4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d62b4a4d
Branch: refs/heads/HDFS-7240
Commit: d62b4a4de75edb840df6634f49cb4beb74e3fb07
Parents: 6eacdea
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Jan 25 12:17:05 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Jan 25 12:17:05 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 8 +
.../hdfs/protocol/ReconfigurationProtocol.java | 4 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../ReconfigurationProtocolServerSideUtils.java | 4 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 35 +++
.../hdfs/server/protocol/NamenodeProtocols.java | 2 +
.../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 254 +++++++++++++------
.../apache/hadoop/hdfs/tools/TestDFSAdmin.java | 162 ++++++++----
8 files changed, 350 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 62c5d81..8f6ed14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -38,9 +38,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
@@ -496,6 +498,12 @@ public class DFSUtilClient {
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
}
+ public static ReconfigurationProtocol createReconfigurationProtocolProxy(
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory) throws IOException {
+ return new ReconfigurationProtocolTranslatorPB(addr, ticket, conf, factory);
+ }
+
/**
* Creates a new KeyProvider from the given Configuration.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
index 75dc877..8370438 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReconfigurationProtocol.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.io.retry.Idempotent;
/**********************************************************************
* ReconfigurationProtocol is used by HDFS admin to reload configuration
@@ -39,16 +40,19 @@ public interface ReconfigurationProtocol {
/**
* Asynchronously reload configuration on disk and apply changes.
*/
+ @Idempotent
void startReconfiguration() throws IOException;
/**
* Get the status of the previously issued reconfig task.
* @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
*/
+ @Idempotent
ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
/**
* Get a list of allowed properties for reconfiguration.
*/
+ @Idempotent
List<String> listReconfigurableProperties() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 49b4d8a..e5285b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -953,6 +953,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9674. The HTrace span for OpWriteBlock should record the maxWriteToDisk
time. (cmccabe via zhz)
+ HDFS-9094. Add command line option to ask NameNode reload
+ configuration. (Xiaobing Zhou via Arpit Agarwal)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
index b2be9cd..9e24204 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolServerSideUtils.java
@@ -41,9 +41,7 @@ public final class ReconfigurationProtocolServerSideUtils {
List<String> reconfigurableProperties) {
ListReconfigurablePropertiesResponseProto.Builder builder =
ListReconfigurablePropertiesResponseProto.newBuilder();
- for (String name : reconfigurableProperties) {
- builder.addName(name);
- }
+ builder.addAllName(reconfigurableProperties);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 7785260..c1646c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -42,6 +42,8 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
@@ -111,12 +113,15 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -286,6 +291,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
+ ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
+ = new ReconfigurationProtocolServerSideTranslatorPB(this);
+ BlockingService reconfigurationPbService = ReconfigurationProtocolService
+ .newReflectiveBlockingService(reconfigurationProtocolXlator);
+
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
@@ -319,6 +329,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
// Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer);
+ DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
+ reconfigurationPbService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@@ -403,6 +415,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
// Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer);
+ DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
+ reconfigurationPbService, clientRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
@@ -2173,4 +2187,25 @@ class NameNodeRpcServer implements NamenodeProtocols {
checkNNStartup();
return namesystem.getErasureCodingPolicy(src);
}
+
+ @Override // ReconfigurationProtocol
+ public void startReconfiguration() {
+ throw new UnsupportedOperationException(
+ "Namenode startReconfiguration is not implemented.",
+ new ReconfigurationException());
+ }
+
+ @Override // ReconfigurationProtocol
+ public ReconfigurationTaskStatus getReconfigurationStatus() {
+ throw new UnsupportedOperationException(
+ " Namenode getReconfigurationStatus is not implemented.",
+ new ReconfigurationException());
+ }
+
+ @Override // ReconfigurationProtocol
+ public List<String> listReconfigurableProperties() {
+ throw new UnsupportedOperationException(
+ " Namenode listReconfigurableProperties is not implemented.",
+ new ReconfigurationException());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
index 23b6f2e..4a3d83d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
@@ -35,6 +36,7 @@ public interface NamenodeProtocols
DatanodeProtocol,
NamenodeProtocol,
RefreshAuthorizationPolicyProtocol,
+ ReconfigurationProtocol,
RefreshUserMappingsProtocol,
RefreshCallQueueProtocol,
GenericRefreshProtocol,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 5da3bc5..9c782e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -37,6 +37,7 @@ import java.util.TreeSet;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -414,7 +416,8 @@ public class DFSAdmin extends FsShell {
"\t[-refreshSuperUserGroupsConfiguration]\n" +
"\t[-refreshCallQueue]\n" +
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
- "\t[-reconfig <datanode|...> <host:ipc_port> <start|status|properties>]\n" +
+ "\t[-reconfig <namenode|datanode> <host:ipc_port> " +
+ "<start|status|properties>]\n" +
"\t[-printTopology]\n" +
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
@@ -1028,12 +1031,12 @@ public class DFSAdmin extends FsShell {
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
- String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status|properties>:\n" +
+ String reconfig = "-reconfig <namenode|datanode> <host:ipc_port> " +
+ "<start|status|properties>:\n" +
"\tStarts or gets the status of a reconfiguration operation, \n" +
"\tor gets a list of reconfigurable properties.\n" +
- "\tThe second parameter specifies the node type.\n" +
- "\tCurrently, only reloading DataNode's configuration is supported.\n";
+ "\tThe second parameter specifies the node type\n";
String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
"\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
"\ton <hostname:port>. All other args after are sent to the host.\n";
@@ -1494,104 +1497,186 @@ public class DFSAdmin extends FsShell {
String nodeType = argv[i];
String address = argv[i + 1];
String op = argv[i + 2];
+
if ("start".equals(op)) {
- return startReconfiguration(nodeType, address);
+ return startReconfiguration(nodeType, address, System.out, System.err);
} else if ("status".equals(op)) {
return getReconfigurationStatus(nodeType, address, System.out, System.err);
} else if ("properties".equals(op)) {
- return getReconfigurableProperties(
- nodeType, address, System.out, System.err);
+ return getReconfigurableProperties(nodeType, address, System.out,
+ System.err);
}
System.err.println("Unknown operation: " + op);
return -1;
}
- int startReconfiguration(String nodeType, String address) throws IOException {
- if ("datanode".equals(nodeType)) {
- ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
- dnProxy.startReconfiguration();
- System.out.println("Started reconfiguration task on DataNode " + address);
+ int startReconfiguration(final String nodeThpe, final String address)
+ throws IOException {
+ return startReconfiguration(nodeThpe, address, System.out, System.err);
+ }
+
+ int startReconfiguration(final String nodeType, final String address,
+ final PrintStream out, final PrintStream err) throws IOException {
+ String outMsg = null;
+ String errMsg = null;
+ int ret = 0;
+
+ try {
+ ret = startReconfigurationDispatch(nodeType, address, out, err);
+ outMsg = String.format("Started reconfiguration task on node [%s].",
+ address);
+ } catch (IOException e) {
+ errMsg = String.format("Node [%s] reconfiguring: %s.", address,
+ e.toString());
+ }
+
+ if (errMsg != null) {
+ err.println(errMsg);
+ return 1;
+ } else {
+ out.println(outMsg);
+ return ret;
+ }
+ }
+
+ int startReconfigurationDispatch(final String nodeType,
+ final String address, final PrintStream out, final PrintStream err)
+ throws IOException {
+ if ("namenode".equals(nodeType)) {
+ ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
+ reconfProxy.startReconfiguration();
+ return 0;
+ } else if ("datanode".equals(nodeType)) {
+ ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
+ reconfProxy.startReconfiguration();
return 0;
} else {
- System.err.println("Node type " + nodeType +
- " does not support reconfiguration.");
+ System.err.println("Node type " + nodeType
+ + " does not support reconfiguration.");
return 1;
}
}
- int getReconfigurationStatus(String nodeType, String address,
- PrintStream out, PrintStream err) throws IOException {
- if ("datanode".equals(nodeType)) {
- ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
- try {
- ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
- out.print("Reconfiguring status for DataNode[" + address + "]: ");
- if (!status.hasTask()) {
- out.println("no task was found.");
- return 0;
- }
- out.print("started at " + new Date(status.getStartTime()));
- if (!status.stopped()) {
- out.println(" and is still running.");
- return 0;
- }
+ int getReconfigurationStatus(final String nodeType, final String address,
+ final PrintStream out, final PrintStream err) throws IOException {
+ String outMsg = null;
+ String errMsg = null;
+ ReconfigurationTaskStatus status = null;
- out.println(" and finished at " +
- new Date(status.getEndTime()).toString() + ".");
- if (status.getStatus() == null) {
- // Nothing to report.
- return 0;
- }
- for (Map.Entry<PropertyChange, Optional<String>> result :
- status.getStatus().entrySet()) {
- if (!result.getValue().isPresent()) {
- out.printf(
- "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
- result.getKey().prop, result.getKey().oldVal,
- result.getKey().newVal);
- } else {
- final String errorMsg = result.getValue().get();
- out.printf(
- "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
- result.getKey().prop, result.getKey().oldVal,
- result.getKey().newVal);
- out.println("\tError: " + errorMsg + ".");
- }
+ try {
+ status = getReconfigurationStatusDispatch(nodeType, address, out, err);
+ outMsg = String.format("Reconfiguring status for node [%s]: ", address);
+ } catch (IOException e) {
+ errMsg = String.format("Node [%s] reloading configuration: %s.", address,
+ e.toString());
+ }
+
+ if (errMsg != null) {
+ err.println(errMsg);
+ return 1;
+ } else {
+ out.print(outMsg);
+ }
+
+ if (status != null) {
+ if (!status.hasTask()) {
+ out.println("no task was found.");
+ return 0;
+ }
+ out.print("started at " + new Date(status.getStartTime()));
+ if (!status.stopped()) {
+ out.println(" and is still running.");
+ return 0;
+ }
+
+ out.println(" and finished at "
+ + new Date(status.getEndTime()).toString() + ".");
+ if (status.getStatus() == null) {
+ // Nothing to report.
+ return 0;
+ }
+ for (Map.Entry<PropertyChange, Optional<String>> result : status
+ .getStatus().entrySet()) {
+ if (!result.getValue().isPresent()) {
+ out.printf(
+ "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
+ result.getKey().prop, result.getKey().oldVal,
+ result.getKey().newVal);
+ } else {
+ final String errorMsg = result.getValue().get();
+ out.printf(
+ "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
+ result.getKey().prop, result.getKey().oldVal,
+ result.getKey().newVal);
+ out.println("\tError: " + errorMsg + ".");
}
- } catch (IOException e) {
- err.println("DataNode reloading configuration: " + e + ".");
- return 1;
}
} else {
- err.println("Node type " + nodeType +
- " does not support reconfiguration.");
return 1;
}
+
return 0;
}
- int getReconfigurableProperties(String nodeType, String address,
- PrintStream out, PrintStream err) throws IOException {
- if ("datanode".equals(nodeType)) {
- ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
- try {
- List<String> properties =
- dnProxy.listReconfigurableProperties();
- out.println(
- "Configuration properties that are allowed to be reconfigured:");
- for (String name : properties) {
- out.println(name);
- }
- } catch (IOException e) {
- err.println("DataNode reconfiguration: " + e + ".");
- return 1;
- }
+ ReconfigurationTaskStatus getReconfigurationStatusDispatch(
+ final String nodeType, final String address, final PrintStream out,
+ final PrintStream err) throws IOException {
+ if ("namenode".equals(nodeType)) {
+ ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
+ return reconfProxy.getReconfigurationStatus();
+ } else if ("datanode".equals(nodeType)) {
+ ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
+ return reconfProxy.getReconfigurationStatus();
} else {
- err.println("Node type " + nodeType +
- " does not support reconfiguration.");
+ err.println("Node type " + nodeType
+ + " does not support reconfiguration.");
+ return null;
+ }
+ }
+
+ int getReconfigurableProperties(final String nodeType, final String address,
+ final PrintStream out, final PrintStream err) throws IOException {
+ String outMsg = null;
+ String errMsg = null;
+ List<String> properties = null;
+
+ try {
+ properties = getReconfigurablePropertiesDispatch(nodeType, address, out,
+ err);
+ outMsg = String.format("Node [%s] Reconfigurable properties:", address);
+ } catch (IOException e) {
+ errMsg = String.format("Node [%s] reconfiguration: %s.", address,
+ e.toString());
+ }
+
+ if (errMsg != null) {
+ err.println(errMsg);
return 1;
+ } else if (properties == null) {
+ return 1;
+ } else {
+ out.println(outMsg);
+ for (String name : properties) {
+ out.println(name);
+ }
+ return 0;
+ }
+ }
+
+ List<String> getReconfigurablePropertiesDispatch(final String nodeType,
+ final String address, final PrintStream out, final PrintStream err)
+ throws IOException {
+ if ("namenode".equals(nodeType)) {
+ ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
+ return reconfProxy.listReconfigurableProperties();
+ } else if ("datanode".equals(nodeType)) {
+ ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
+ return reconfProxy.listReconfigurableProperties();
+ } else {
+ err.println("Node type " + nodeType
+ + " does not support reconfiguration.");
+ return null;
}
- return 0;
}
public int genericRefresh(String[] argv, int i) throws IOException {
@@ -1712,7 +1797,7 @@ public class DFSAdmin extends FsShell {
+ " [-refreshCallQueue]");
} else if ("-reconfig".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
- + " [-reconfig <datanode|...> <host:port> <start|status>]");
+ + " [-reconfig <namenode|datanode> <host:port> <start|status>]");
} else if ("-refresh".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
@@ -2028,6 +2113,23 @@ public class DFSAdmin extends FsShell {
NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
return dnProtocol;
}
+
+ private ReconfigurationProtocol getNameNodeProxy(String node)
+ throws IOException {
+ InetSocketAddress nodeAddr = NetUtils.createSocketAddr(node);
+ // Get the current configuration
+ Configuration conf = getConf();
+
+ // For namenode proxy the server principal should be NN's one.
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
+ conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
+
+ // Create the client
+ ReconfigurationProtocol reconfigProtocol = DFSUtilClient
+ .createReconfigurationProtocolProxy(nodeAddr, getUGI(), conf,
+ NetUtils.getSocketFactory(conf, ReconfigurationProtocol.class));
+ return reconfigProtocol;
+ }
private int deleteBlockPool(String[] argv, int i) throws IOException {
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d62b4a4d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 3a30ccf..a3ed4f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import com.google.common.collect.Lists;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -27,6 +30,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -52,10 +56,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestDFSAdmin {
+ private static final Log LOG = LogFactory.getLog(DFSAdmin.class);
private Configuration conf = null;
private MiniDFSCluster cluster;
private DFSAdmin admin;
private DataNode datanode;
+ private NameNode namenode;
@Before
public void setUp() throws Exception {
@@ -80,21 +86,64 @@ public class TestDFSAdmin {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
datanode = cluster.getDataNodes().get(0);
+ namenode = cluster.getNameNode();
+ }
+
+ private void startReconfiguration(String nodeType, String address,
+ final List<String> outs, final List<String> errs) throws IOException {
+ reconfigurationOutErrFormatter("startReconfiguration", nodeType,
+ address, outs, errs);
+ }
+
+ private void getReconfigurableProperties(String nodeType, String address,
+ final List<String> outs, final List<String> errs) throws IOException {
+ reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
+ address, outs, errs);
+ }
+
+ private void getReconfigurationStatus(String nodeType, String address,
+ final List<String> outs, final List<String> errs) throws IOException {
+ reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType,
+ address, outs, errs);
}
- private List<String> getReconfigureStatus(String nodeType, String address)
- throws IOException {
+ private void reconfigurationOutErrFormatter(String methodName,
+ String nodeType, String address, final List<String> outs,
+ final List<String> errs) throws IOException {
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bufOut);
ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
PrintStream err = new PrintStream(bufErr);
- admin.getReconfigurationStatus(nodeType, address, out, err);
+
+ if (methodName.equals("getReconfigurableProperties")) {
+ admin.getReconfigurableProperties(nodeType, address, out, err);
+ } else if (methodName.equals("getReconfigurationStatus")) {
+ admin.getReconfigurationStatus(nodeType, address, out, err);
+ } else if (methodName.equals("startReconfiguration")) {
+ admin.startReconfiguration(nodeType, address, out, err);
+ }
+
Scanner scanner = new Scanner(bufOut.toString());
- List<String> outputs = Lists.newArrayList();
while (scanner.hasNextLine()) {
- outputs.add(scanner.nextLine());
+ outs.add(scanner.nextLine());
+ }
+ scanner.close();
+ scanner = new Scanner(bufErr.toString());
+ while (scanner.hasNextLine()) {
+ errs.add(scanner.nextLine());
}
- return outputs;
+ scanner.close();
+ }
+
+ @Test(timeout = 30000)
+ public void testDataNodeGetReconfigurableProperties() throws IOException {
+ final int port = datanode.getIpcPort();
+ final String address = "localhost:" + port;
+ final List<String> outs = Lists.newArrayList();
+ final List<String> errs = Lists.newArrayList();
+ getReconfigurableProperties("datanode", address, outs, errs);
+ assertEquals(3, outs.size());
+ assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}
/**
@@ -103,7 +152,7 @@ public class TestDFSAdmin {
* @throws IOException
* @throws InterruptedException
*/
- private void testGetReconfigurationStatus(boolean expectedSuccuss)
+ private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
throws IOException, InterruptedException {
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
datanode.setReconfigurationUtil(ru);
@@ -130,21 +179,25 @@ public class TestDFSAdmin {
assertThat(admin.startReconfiguration("datanode", address), is(0));
- List<String> outputs = null;
int count = 100;
+ final List<String> outs = Lists.newArrayList();
+ final List<String> errs = Lists.newArrayList();
while (count > 0) {
- outputs = getReconfigureStatus("datanode", address);
- if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
+ outs.clear();
+ errs.clear();
+ getReconfigurationStatus("datanode", address, outs, errs);
+ if (!outs.isEmpty() && outs.get(0).contains("finished")) {
break;
}
count--;
Thread.sleep(100);
}
+ LOG.info(String.format("count=%d", count));
assertTrue(count > 0);
if (expectedSuccuss) {
- assertThat(outputs.size(), is(4));
+ assertThat(outs.size(), is(4));
} else {
- assertThat(outputs.size(), is(6));
+ assertThat(outs.size(), is(6));
}
List<StorageLocation> locations = DataNode.getStorageLocations(
@@ -160,55 +213,78 @@ public class TestDFSAdmin {
int offset = 1;
if (expectedSuccuss) {
- assertThat(outputs.get(offset),
+ assertThat(outs.get(offset),
containsString("SUCCESS: Changed property " +
DFS_DATANODE_DATA_DIR_KEY));
} else {
- assertThat(outputs.get(offset),
+ assertThat(outs.get(offset),
containsString("FAILED: Change property " +
DFS_DATANODE_DATA_DIR_KEY));
}
- assertThat(outputs.get(offset + 1),
+ assertThat(outs.get(offset + 1),
is(allOf(containsString("From:"), containsString("data1"),
containsString("data2"))));
- assertThat(outputs.get(offset + 2),
+ assertThat(outs.get(offset + 2),
is(not(anyOf(containsString("data1"), containsString("data2")))));
- assertThat(outputs.get(offset + 2),
+ assertThat(outs.get(offset + 2),
is(allOf(containsString("To"), containsString("data_new"))));
}
@Test(timeout = 30000)
- public void testGetReconfigurationStatus()
- throws IOException, InterruptedException {
- testGetReconfigurationStatus(true);
+ public void testDataNodeGetReconfigurationStatus() throws IOException,
+ InterruptedException {
+ testDataNodeGetReconfigurationStatus(true);
restartCluster();
- testGetReconfigurationStatus(false);
+ testDataNodeGetReconfigurationStatus(false);
}
- private List<String> getReconfigurationAllowedProperties(
- String nodeType, String address)
- throws IOException {
- ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
- PrintStream out = new PrintStream(bufOut);
- ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
- PrintStream err = new PrintStream(bufErr);
- admin.getReconfigurableProperties(nodeType, address, out, err);
- Scanner scanner = new Scanner(bufOut.toString());
- List<String> outputs = Lists.newArrayList();
- while (scanner.hasNextLine()) {
- outputs.add(scanner.nextLine());
- }
- return outputs;
+ @Test(timeout = 30000)
+ public void testNameNodeStartReconfiguration() throws IOException {
+ final String address = namenode.getHostAndPort();
+ final List<String> outs = Lists.newArrayList();
+ final List<String> errs = Lists.newArrayList();
+ startReconfiguration("namenode", address, outs, errs);
+ assertEquals(0, outs.size());
+ assertTrue(errs.size() > 1);
+ assertThat(
+ errs.get(0),
+ is(allOf(containsString("Namenode"), containsString("reconfiguring:"),
+ containsString("startReconfiguration"),
+ containsString("is not implemented"),
+ containsString("UnsupportedOperationException"))));
}
@Test(timeout = 30000)
- public void testGetReconfigAllowedProperties() throws IOException {
- final int port = datanode.getIpcPort();
- final String address = "localhost:" + port;
- List<String> outputs =
- getReconfigurationAllowedProperties("datanode", address);
- assertEquals(3, outputs.size());
- assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
- outputs.get(1));
+ public void testNameNodeGetReconfigurableProperties() throws IOException {
+ final String address = namenode.getHostAndPort();
+ final List<String> outs = Lists.newArrayList();
+ final List<String> errs = Lists.newArrayList();
+ getReconfigurableProperties("namenode", address, outs, errs);
+ assertEquals(0, outs.size());
+ assertTrue(errs.size() > 1);
+ assertThat(
+ errs.get(0),
+ is(allOf(containsString("Namenode"),
+ containsString("reconfiguration:"),
+ containsString("listReconfigurableProperties"),
+ containsString("is not implemented"),
+ containsString("UnsupportedOperationException"))));
+ }
+
+ @Test(timeout = 30000)
+ public void testNameNodeGetReconfigurationStatus() throws IOException {
+ final String address = namenode.getHostAndPort();
+ final List<String> outs = Lists.newArrayList();
+ final List<String> errs = Lists.newArrayList();
+ getReconfigurationStatus("namenode", address, outs, errs);
+ assertEquals(0, outs.size());
+ assertTrue(errs.size() > 1);
+ assertThat(
+ errs.get(0),
+ is(allOf(containsString("Namenode"),
+ containsString("reloading configuration:"),
+ containsString("getReconfigurationStatus"),
+ containsString("is not implemented"),
+ containsString("UnsupportedOperationException"))));
}
}
\ No newline at end of file