You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2021/01/04 18:38:50 UTC
[hadoop] branch trunk updated: HDFS-15748. RBF: Move the router
related part from hadoop-federation-balance module to hadoop-hdfs-rbf.
Contributed by Jinglun.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77299ae HDFS-15748. RBF: Move the router related part from hadoop-federation-balance module to hadoop-hdfs-rbf. Contributed by Jinglun.
77299ae is described below
commit 77299ae992b16066dd61e4fec9ff63b863ae2e21
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Jan 5 00:05:03 2021 +0530
HDFS-15748. RBF: Move the router related part from hadoop-federation-balance module to hadoop-hdfs-rbf. Contributed by Jinglun.
---
.../hadoop-common/src/main/bin/hadoop | 6 ++
hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml | 11 +++
.../hdfs/rbfbalance}/MountTableProcedure.java | 2 +-
.../hdfs/rbfbalance/RouterDistCpProcedure.java | 56 +++++++++++
.../hadoop/hdfs/rbfbalance/RouterFedBalance.java | 108 ++++++++-------------
.../hadoop/hdfs/rbfbalance/package-info.java | 25 +++++
.../hdfs/rbfbalance}/TestMountTableProcedure.java | 2 +-
hadoop-tools/hadoop-federation-balance/pom.xml | 11 ---
.../hadoop/tools/fedbalance/DistCpProcedure.java | 37 +++----
.../apache/hadoop/tools/fedbalance/FedBalance.java | 88 ++---------------
.../hadoop/tools/fedbalance/FedBalanceContext.java | 15 ++-
.../hadoop/tools/fedbalance/FedBalanceOptions.java | 28 ++----
.../src/site/markdown/HDFSFederationBalance.md | 18 ++--
.../tools/fedbalance/TestDistCpProcedure.java | 7 +-
14 files changed, 202 insertions(+), 212 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop
index 7d9ffc6..7f46e7e 100755
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop
@@ -47,6 +47,7 @@ function hadoop_usage
hadoop_add_subcommand "trace" client "view and modify Hadoop tracing settings"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "kdiag" client "Diagnose Kerberos Problems"
+ hadoop_add_subcommand "rbfbalance" client "move directories and files across router-based federation namespaces"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
}
@@ -171,6 +172,11 @@ function hadoopcmd_case
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;
+ rbfbalance)
+ HADOOP_CLASSNAME=org.apache.hadoop.hdfs.rbfbalance.RouterFedBalance
+ hadoop_add_to_classpath_tools hadoop-federation-balance
+ hadoop_add_to_classpath_tools hadoop-distcp
+ ;;
*)
HADOOP_CLASSNAME="${subcmd}"
if ! hadoop_validate_classname "${HADOOP_CLASSNAME}"; then
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
index 23e0b8f..41290cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
@@ -70,6 +70,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-federation-balance</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
@@ -86,6 +91,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-federation-balance</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/MountTableProcedure.java
similarity index 99%
rename from hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java
rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/MountTableProcedure.java
index 17bc828..8bd39d1 100644
--- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/MountTableProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.tools.fedbalance;
+package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java
new file mode 100644
index 0000000..b07f3b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterDistCpProcedure.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.rbfbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
+import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
+
+import java.io.IOException;
+
+/**
+ * Copy data through distcp in router-based federation cluster. It disables
+ * write by setting mount entry readonly.
+ */
+public class RouterDistCpProcedure extends DistCpProcedure {
+
+ public RouterDistCpProcedure() {}
+
+ public RouterDistCpProcedure(String name, String nextProcedure,
+ long delayDuration, FedBalanceContext context) throws IOException {
+ super(name, nextProcedure, delayDuration, context);
+ }
+
+ /**
+ * Disable write by making the mount entry readonly.
+ */
+ @Override
+ protected void disableWrite(FedBalanceContext context) throws IOException {
+ Configuration conf = context.getConf();
+ String mount = context.getMount();
+ MountTableProcedure.disableWrite(mount, conf);
+ }
+
+ /**
+ * Enable write.
+ */
+ @Override
+ protected void enableWrite() throws IOException {
+ // do nothing.
+ }
+}
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java
similarity index 79%
copy from hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
copy to hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java
index c850798..f99a2f1 100644
--- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/RouterFedBalance.java
@@ -15,25 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.tools.fedbalance;
+package org.apache.hadoop.hdfs.rbfbalance;
-import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
+import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.tools.fedbalance.TrashProcedure;
+import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
@@ -44,44 +47,38 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.ROUTER;
+import static org.apache.hadoop.tools.fedbalance.FedBalance.FED_BALANCE_DEFAULT_XML;
+import static org.apache.hadoop.tools.fedbalance.FedBalance.FED_BALANCE_SITE_XML;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.FORCE_CLOSE_OPEN;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.MAP;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH;
-import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DELAY_DURATION;
-import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DIFF_THRESHOLD;
-import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH;
/**
- * Balance data from src cluster to dst cluster with distcp.
+ * Balance data in router-based federation cluster. From src sub-namespace to
+ * dst sub-namespace with distcp.
*
* 1. Move data from the source path to the destination path with distcp.
* 2. Update the the mount entry.
* 3. Delete the source path to trash.
*/
-public class FedBalance extends Configured implements Tool {
+public class RouterFedBalance extends Configured implements Tool {
public static final Logger LOG =
- LoggerFactory.getLogger(FedBalance.class);
+ LoggerFactory.getLogger(RouterFedBalance.class);
private static final String SUBMIT_COMMAND = "submit";
private static final String CONTINUE_COMMAND = "continue";
- private static final String NO_MOUNT = "no-mount";
private static final String DISTCP_PROCEDURE = "distcp-procedure";
private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
private static final String TRASH_PROCEDURE = "trash-procedure";
- private static final String FED_BALANCE_DEFAULT_XML =
- "hdfs-fedbalance-default.xml";
- private static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml";
-
/**
* This class helps building the balance job.
*/
private class Builder {
- /* Balancing in an rbf cluster. */
- private boolean routerCluster = false;
/* Force close all open files while there is no diff. */
private boolean forceCloseOpen = false;
/* Max number of concurrent maps to use for copy. */
@@ -89,7 +86,7 @@ public class FedBalance extends Configured implements Tool {
/* Specify bandwidth per map in MB. */
private int bandwidth = 10;
/* Specify the trash behaviour of the source path. */
- private TrashOption trashOpt = TrashOption.TRASH;
+ private FedBalanceConfigs.TrashOption trashOpt = TrashOption.TRASH;
/* Specify the duration(millie seconds) when the procedure needs retry. */
private long delayDuration = TimeUnit.SECONDS.toMillis(1);
/* Specify the threshold of diff entries. */
@@ -105,15 +102,6 @@ public class FedBalance extends Configured implements Tool {
}
/**
- * Whether balancing in an rbf cluster.
- * @param value true if it's running in a router-based federation cluster.
- */
- public Builder setRouterCluster(boolean value) {
- this.routerCluster = value;
- return this;
- }
-
- /**
* Whether force close all open files while there is no diff.
* @param value true if force close all the open files.
*/
@@ -177,40 +165,26 @@ public class FedBalance extends Configured implements Tool {
if (dst.toUri().getAuthority() == null) {
throw new IOException("The destination cluster must be specified.");
}
- if (routerCluster) { // router-based federation.
- Path src = getSrcPath(inputSrc);
- String mount = inputSrc;
- context = new FedBalanceContext.Builder(src, dst, mount, getConf())
- .setForceCloseOpenFiles(forceCloseOpen)
- .setUseMountReadOnly(routerCluster).setMapNum(map)
- .setBandwidthLimit(bandwidth).setTrash(trashOpt)
- .setDelayDuration(delayDuration)
- .setDiffThreshold(diffThreshold).build();
- } else { // normal federation cluster.
- Path src = new Path(inputSrc);
- if (src.toUri().getAuthority() == null) {
- throw new IOException("The source cluster must be specified.");
- }
- context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
- .setForceCloseOpenFiles(forceCloseOpen)
- .setUseMountReadOnly(routerCluster).setMapNum(map)
- .setBandwidthLimit(bandwidth).setTrash(trashOpt)
- .setDiffThreshold(diffThreshold).build();
- }
+ Path src = getSrcPath(inputSrc);
+ String mount = inputSrc;
+ context = new FedBalanceContext.Builder(src, dst, mount, getConf())
+ .setForceCloseOpenFiles(forceCloseOpen).setUseMountReadOnly(true)
+ .setMapNum(map).setBandwidthLimit(bandwidth).setTrash(trashOpt)
+ .setDelayDuration(delayDuration).setDiffThreshold(diffThreshold)
+ .build();
LOG.info(context.toString());
// Construct the balance job.
BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
- DistCpProcedure dcp =
- new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context);
+ RouterDistCpProcedure dcp =
+ new RouterDistCpProcedure(DISTCP_PROCEDURE, null, delayDuration,
+ context);
builder.nextProcedure(dcp);
- if (routerCluster) {
- MountTableProcedure mtp =
- new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
- inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
- getConf());
- builder.nextProcedure(mtp);
- }
+ MountTableProcedure mtp =
+ new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
+ inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
+ getConf());
+ builder.nextProcedure(mtp);
TrashProcedure tp =
new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(tp);
@@ -218,15 +192,14 @@ public class FedBalance extends Configured implements Tool {
}
}
- public FedBalance() {
+ public RouterFedBalance() {
super();
}
@Override
public int run(String[] args) throws Exception {
CommandLineParser parser = new GnuParser();
- CommandLine command =
- parser.parse(FedBalanceOptions.CLI_OPTIONS, args, true);
+ CommandLine command = parser.parse(CLI_OPTIONS, args, true);
String[] leftOverArgs = command.getArgs();
if (leftOverArgs == null || leftOverArgs.length < 1) {
printUsage();
@@ -291,7 +264,6 @@ public class FedBalance extends Configured implements Tool {
throws IOException {
Builder builder = new Builder(inputSrc, inputDst);
// parse options.
- builder.setRouterCluster(command.hasOption(ROUTER.getOpt()));
builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
if (command.hasOption(MAP.getOpt())) {
builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
@@ -371,7 +343,7 @@ public class FedBalance extends Configured implements Tool {
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(
- "fedbalance OPTIONS [submit|continue] <src> <target>\n\nOPTIONS",
+ "rbfbalance OPTIONS [submit|continue] <src> <target>\n\nOPTIONS",
CLI_OPTIONS);
}
@@ -391,19 +363,19 @@ public class FedBalance extends Configured implements Tool {
}
/**
- * Main function of the FedBalance program. Parses the input arguments and
- * invokes the FedBalance::run() method, via the ToolRunner.
- * @param argv Command-line arguments sent to FedBalance.
+ * Main function of the RouterFedBalance program. Parses the input arguments
+ * and invokes the RouterFedBalance::run() method, via the ToolRunner.
+ * @param argv Command-line arguments sent to RouterFedBalance.
*/
public static void main(String[] argv) {
Configuration conf = getDefaultConf();
- FedBalance fedBalance = new FedBalance();
+ RouterFedBalance fedBalance = new RouterFedBalance();
fedBalance.setConf(conf);
int exitCode;
try {
exitCode = ToolRunner.run(fedBalance, argv);
} catch (Exception e) {
- LOG.warn("Couldn't complete FedBalance operation.", e);
+ LOG.warn("Couldn't complete RouterFedBalance operation.", e);
exitCode = -1;
}
System.exit(exitCode);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java
new file mode 100644
index 0000000..ff6a1d2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/rbfbalance/package-info.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * FedBalance is a tool for balancing data across federation clusters.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.hdfs.rbfbalance;
+import org.apache.hadoop.classification.InterfaceAudience;
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestMountTableProcedure.java
similarity index 99%
rename from hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java
rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestMountTableProcedure.java
index 9dd4e5d..4f94c0e 100644
--- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestMountTableProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.tools.fedbalance;
+package org.apache.hadoop.hdfs.rbfbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
diff --git a/hadoop-tools/hadoop-federation-balance/pom.xml b/hadoop-tools/hadoop-federation-balance/pom.xml
index cf79e17..588bb98 100644
--- a/hadoop-tools/hadoop-federation-balance/pom.xml
+++ b/hadoop-tools/hadoop-federation-balance/pom.xml
@@ -104,17 +104,6 @@
<type>test-jar</type>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs-rbf</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs-rbf</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
index 33d37be..fa4a088 100644
--- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
@@ -155,7 +155,7 @@ public class DistCpProcedure extends BalanceProcedure {
diffDistCp();
return false;
case DISABLE_WRITE:
- disableWrite();
+ disableWrite(context);
return false;
case FINAL_DISTCP:
finalDistCp();
@@ -238,24 +238,29 @@ public class DistCpProcedure extends BalanceProcedure {
}
/**
- * Disable write either by making the mount entry readonly or cancelling the
- * execute permission of the source path.
+ * Disable write by cancelling the execute permission of the source path.
+ * TODO: Disable the super user from writing.
+ * @param fbcontext the context.
+ * @throws IOException if can't disable write.
*/
- void disableWrite() throws IOException {
- if (useMountReadOnly) {
- String mount = context.getMount();
- MountTableProcedure.disableWrite(mount, conf);
- } else {
- // Save and cancel permission.
- FileStatus status = srcFs.getFileStatus(src);
- fPerm = status.getPermission();
- acl = srcFs.getAclStatus(src);
- srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
- }
+ protected void disableWrite(FedBalanceContext fbcontext) throws IOException {
+ // Save and cancel permission.
+ FileStatus status = srcFs.getFileStatus(src);
+ fPerm = status.getPermission();
+ acl = srcFs.getAclStatus(src);
+ srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
updateStage(Stage.FINAL_DISTCP);
}
/**
+ * Enable write.
+ * @throws IOException if can't enable write.
+ */
+ protected void enableWrite() throws IOException {
+ restorePermission();
+ }
+
+ /**
* Enable write by restoring the x permission.
*/
void restorePermission() throws IOException {
@@ -297,9 +302,7 @@ public class DistCpProcedure extends BalanceProcedure {
}
void finish() throws IOException {
- if (!useMountReadOnly) {
- restorePermission();
- }
+ enableWrite();
if (srcFs.exists(src)) {
cleanupSnapshot(srcFs, src);
}
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
index c850798..64805c0 100644
--- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
@@ -27,24 +27,17 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
-import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
-import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
-import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
-import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.ROUTER;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.FORCE_CLOSE_OPEN;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.MAP;
import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.BANDWIDTH;
@@ -58,8 +51,7 @@ import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
* Balance data from src cluster to dst cluster with distcp.
*
* 1. Move data from the source path to the destination path with distcp.
- * 2. Update the the mount entry.
- * 3. Delete the source path to trash.
+ * 2. Delete the source path to trash.
*/
public class FedBalance extends Configured implements Tool {
@@ -69,19 +61,16 @@ public class FedBalance extends Configured implements Tool {
private static final String CONTINUE_COMMAND = "continue";
private static final String NO_MOUNT = "no-mount";
private static final String DISTCP_PROCEDURE = "distcp-procedure";
- private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
private static final String TRASH_PROCEDURE = "trash-procedure";
- private static final String FED_BALANCE_DEFAULT_XML =
+ public static final String FED_BALANCE_DEFAULT_XML =
"hdfs-fedbalance-default.xml";
- private static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml";
+ public static final String FED_BALANCE_SITE_XML = "hdfs-fedbalance-site.xml";
/**
* This class helps building the balance job.
*/
private class Builder {
- /* Balancing in an rbf cluster. */
- private boolean routerCluster = false;
/* Force close all open files while there is no diff. */
private boolean forceCloseOpen = false;
/* Max number of concurrent maps to use for copy. */
@@ -105,15 +94,6 @@ public class FedBalance extends Configured implements Tool {
}
/**
- * Whether balancing in an rbf cluster.
- * @param value true if it's running in a router-based federation cluster.
- */
- public Builder setRouterCluster(boolean value) {
- this.routerCluster = value;
- return this;
- }
-
- /**
* Whether force close all open files while there is no diff.
* @param value true if force close all the open files.
*/
@@ -177,26 +157,14 @@ public class FedBalance extends Configured implements Tool {
if (dst.toUri().getAuthority() == null) {
throw new IOException("The destination cluster must be specified.");
}
- if (routerCluster) { // router-based federation.
- Path src = getSrcPath(inputSrc);
- String mount = inputSrc;
- context = new FedBalanceContext.Builder(src, dst, mount, getConf())
- .setForceCloseOpenFiles(forceCloseOpen)
- .setUseMountReadOnly(routerCluster).setMapNum(map)
- .setBandwidthLimit(bandwidth).setTrash(trashOpt)
- .setDelayDuration(delayDuration)
- .setDiffThreshold(diffThreshold).build();
- } else { // normal federation cluster.
- Path src = new Path(inputSrc);
- if (src.toUri().getAuthority() == null) {
- throw new IOException("The source cluster must be specified.");
- }
- context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
- .setForceCloseOpenFiles(forceCloseOpen)
- .setUseMountReadOnly(routerCluster).setMapNum(map)
- .setBandwidthLimit(bandwidth).setTrash(trashOpt)
- .setDiffThreshold(diffThreshold).build();
+ Path src = new Path(inputSrc);
+ if (src.toUri().getAuthority() == null) {
+ throw new IOException("The source cluster must be specified.");
}
+ context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
+ .setForceCloseOpenFiles(forceCloseOpen).setUseMountReadOnly(false)
+ .setMapNum(map).setBandwidthLimit(bandwidth).setTrash(trashOpt)
+ .setDiffThreshold(diffThreshold).build();
LOG.info(context.toString());
// Construct the balance job.
@@ -204,13 +172,6 @@ public class FedBalance extends Configured implements Tool {
DistCpProcedure dcp =
new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(dcp);
- if (routerCluster) {
- MountTableProcedure mtp =
- new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
- inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
- getConf());
- builder.nextProcedure(mtp);
- }
TrashProcedure tp =
new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
builder.nextProcedure(tp);
@@ -291,7 +252,6 @@ public class FedBalance extends Configured implements Tool {
throws IOException {
Builder builder = new Builder(inputSrc, inputDst);
// parse options.
- builder.setRouterCluster(command.hasOption(ROUTER.getOpt()));
builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
if (command.hasOption(MAP.getOpt())) {
builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
@@ -340,34 +300,6 @@ public class FedBalance extends Configured implements Tool {
return 0;
}
- /**
- * Get src uri from Router.
- */
- private Path getSrcPath(String fedPath) throws IOException {
- String address = getConf().getTrimmed(
- RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
- RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
- InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
- RouterClient rClient = new RouterClient(routerSocket, getConf());
- try {
- MountTableManager mountTable = rClient.getMountTableManager();
- MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable);
- if (entry == null) {
- throw new IllegalArgumentException(
- "The mount point doesn't exist. path=" + fedPath);
- } else if (entry.getDestinations().size() > 1) {
- throw new IllegalArgumentException(
- "The mount point has more than one destination. path=" + fedPath);
- } else {
- String ns = entry.getDestinations().get(0).getNameserviceId();
- String path = entry.getDestinations().get(0).getDest();
- return new Path("hdfs://" + ns + path);
- }
- } finally {
- rClient.close();
- }
- }
-
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
index f4f5700..ec47a94 100644
--- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
@@ -194,7 +194,7 @@ public class FedBalanceContext implements Writable {
return builder.toString();
}
- static class Builder {
+ public static class Builder {
private final Path src;
private final Path dst;
private final String mount;
@@ -215,7 +215,7 @@ public class FedBalanceContext implements Writable {
* @param mount the mount point to be balanced.
* @param conf the configuration.
*/
- Builder(Path src, Path dst, String mount, Configuration conf) {
+ public Builder(Path src, Path dst, String mount, Configuration conf) {
this.src = src;
this.dst = dst;
this.mount = mount;
@@ -225,6 +225,7 @@ public class FedBalanceContext implements Writable {
/**
* Force close open files.
* @param value true if force close all the open files.
+ * @return the builder.
*/
public Builder setForceCloseOpenFiles(boolean value) {
this.forceCloseOpenFiles = value;
@@ -234,6 +235,7 @@ public class FedBalanceContext implements Writable {
/**
* Use mount point readonly to disable write.
* @param value true if disabling write by setting mount point readonly.
+ * @return the builder.
*/
public Builder setUseMountReadOnly(boolean value) {
this.useMountReadOnly = value;
@@ -243,6 +245,7 @@ public class FedBalanceContext implements Writable {
/**
* The map number of the distcp job.
* @param value the map number of the distcp.
+ * @return the builder.
*/
public Builder setMapNum(int value) {
this.mapNum = value;
@@ -252,6 +255,7 @@ public class FedBalanceContext implements Writable {
/**
* The bandwidth limit of the distcp job(MB).
* @param value the bandwidth.
+ * @return the builder.
*/
public Builder setBandwidthLimit(int value) {
this.bandwidthLimit = value;
@@ -261,7 +265,8 @@ public class FedBalanceContext implements Writable {
/**
* Specify the trash behaviour after all the data is sync to the target.
* @param value the trash option.
- * */
+ * @return the builder.
+ */
public Builder setTrash(TrashOption value) {
this.trashOpt = value;
return this;
@@ -269,6 +274,8 @@ public class FedBalanceContext implements Writable {
/**
* Specify the delayed duration when the procedures need to retry.
+ * @param value the delay duration.
+ * @return the builder.
*/
public Builder setDelayDuration(long value) {
this.delayDuration = value;
@@ -277,6 +284,8 @@ public class FedBalanceContext implements Writable {
/**
* Specify the threshold of diff entries.
+ * @param value the diff threshold.
+ * @return the builder.
*/
public Builder setDiffThreshold(int value) {
this.diffThreshold = value;
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
index d7be6a8..4df3f50 100644
--- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java
@@ -31,42 +31,31 @@ public final class FedBalanceOptions {
private FedBalanceOptions() {}
/**
- * Run in router-based federation mode.
- */
- final static Option ROUTER = new Option("router", false,
- "If this option is set then the command runs in router mode."
- + " The source path is taken as a mount point. It will disable write"
- + " by setting the mount point readonly. Otherwise the command works"
- + " in normal federation mode. The source path is taken as the full"
- + " path. It will disable write by cancelling all permissions of the"
- + " source path.");
-
- /**
* If true, in DIFF_DISTCP stage it will force close all open files when
* there is no diff between the source path and the dst path. Otherwise
* the DIFF_DISTCP stage will wait until there is no open files. The
* default value is `false`.
*/
- final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false,
- "Force close all open files if the src and dst are synced.");
+ public final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen",
+ false, "Force close all open files if the src and dst are synced.");
/**
* Max number of maps to use during copy. DistCp will split work as equally
* as possible among these maps.
*/
- final static Option MAP =
+ public final static Option MAP =
new Option("map", true, "Max number of concurrent maps to use for copy");
/**
* Specify bandwidth per map in MB, accepts bandwidth as a fraction.
*/
- final static Option BANDWIDTH =
+ public final static Option BANDWIDTH =
new Option("bandwidth", true, "Specify bandwidth per map in MB.");
/**
* Specify the delayed duration(millie seconds) to retry the Job.
*/
- final static Option DELAY_DURATION = new Option("delay", true,
+ public final static Option DELAY_DURATION = new Option("delay", true,
"This specifies the delayed duration(millie seconds) when the job"
+ " needs to retry. A job may retry many times and check the state"
+ " when it waits for the distcp job to finish.");
@@ -74,7 +63,7 @@ public final class FedBalanceOptions {
/**
* Specify the threshold of diff entries.
*/
- final static Option DIFF_THRESHOLD = new Option("diffThreshold", true,
+ public final static Option DIFF_THRESHOLD = new Option("diffThreshold", true,
"This specifies the threshold of the diff entries that used in"
+ " incremental copy stage. If the diff entries size is no greater"
+ " than this threshold and the open files check is satisfied"
@@ -86,17 +75,16 @@ public final class FedBalanceOptions {
* Move the source path to trash after all the data are sync to target, or
* delete the source directly, or skip both trash and deletion.
*/
- final static Option TRASH = new Option("moveToTrash", true,
+ public final static Option TRASH = new Option("moveToTrash", true,
"Move the source path to trash, or delete the source path directly,"
+ " or skip both trash and deletion. This accepts 3 values: trash,"
+ " delete and skip. By default the server side trash interval is"
+ " used. If the trash is disabled in the server side, the default"
+ " trash interval 60 minutes is used.");
- final static Options CLI_OPTIONS = new Options();
+ public final static Options CLI_OPTIONS = new Options();
static {
- CLI_OPTIONS.addOption(ROUTER);
CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN);
CLI_OPTIONS.addOption(MAP);
CLI_OPTIONS.addOption(BANDWIDTH);
diff --git a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
index 03e6e60..c9d643b 100644
--- a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
+++ b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md
@@ -45,10 +45,9 @@ Usage
The command below runs an hdfs federation balance job. The first parameter is
the mount entry. The second one is the target path which must include the
- target cluster. The option `-router` indicates this is in router-based
- federation mode.
+ target cluster.
- bash$ /bin/hadoop fedbalance -router submit /foo/src hdfs://namespace-1/foo/dst
+ bash$ /bin/hadoop rbfbalance -router submit /foo/src hdfs://namespace-1/foo/dst
It copies data from hdfs://namespace-0/foo/src to hdfs://namespace-1/foo/dst
incrementally and finally updates the mount entry to:
@@ -59,7 +58,7 @@ Usage
If the hadoop shell process exits unexpectedly, we can use the command below
to continue the unfinished job:
- bash$ /bin/hadoop fedbalance continue
+ bash$ /bin/hadoop rbfbalance continue
This will scan the journal to find all the unfinished jobs, recover and
continue to execute them.
@@ -77,8 +76,8 @@ Usage
* the router-based federation mode (RBF mode).
* the normal federation mode.
- By default the command runs in the normal federation mode. You can specify the
- rbf mode by using the option `-router`.
+ The command `rbfbalance` runs in router-based federation mode. The command
+ `fedbalance` runs in normal federation mode.
In the rbf mode the first parameter is taken as the mount point. It disables
write by setting the mount point readonly.
@@ -91,11 +90,10 @@ Usage
### Command Options
-Command `submit` has 5 options:
+Command `submit` has 4 options:
| Option key | Description | Default |
| ------------------------------ | ------------------------------------ | ------- |
-| -router | Run in router-based federation mode. | Normal federation mode. |
| -forceCloseOpen | Force close all open files when there is no diff in the DIFF_DISTCP stage. | Wait until there is no open files. |
| -map | Max number of concurrent maps to use for copy. | 10 |
| -bandwidth | Specify bandwidth per map in MB. | 10 |
@@ -106,7 +104,7 @@ Command `submit` has 5 options:
### Configuration Options
--------------------
-Set configuration options at fedbalance-site.xml.
+Set configuration options at hdfs-fedbalance-site.xml.
| Configuration key | Description | Default |
| ------------------------------ | ------------------------------------ | ------- |
@@ -165,7 +163,7 @@ Architecture of HDFS Federation Balance
* MountTableProcedure: This procedure updates the mount entry in Router. The
readonly is unset and the destination is updated of the mount point. This
- procedure is activated only when option `-router`.
+ procedure is activated only in router based federation mode.
* TrashProcedure: This procedure moves the source path to trash.
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
index ea5a8a0..9f554af 100644
--- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
@@ -281,7 +281,7 @@ public class TestDistCpProcedure {
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
- dcProcedure.disableWrite();
+ dcProcedure.disableWrite(context);
dcProcedure.finish();
// Verify path and permission.
@@ -317,7 +317,8 @@ public class TestDistCpProcedure {
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
dcp[0] = serializeProcedure(dcp[0]);
- executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite());
+ executeProcedure(dcp[0], Stage.FINAL_DISTCP,
+ () -> dcp[0].disableWrite(context));
dcp[0] = serializeProcedure(dcp[0]);
OutputStream out = fs.append(new Path(src, "b/c"));
executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp());
@@ -372,7 +373,7 @@ public class TestDistCpProcedure {
new DistCpProcedure("distcp-procedure", null, 1000, context);
assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort());
executeProcedure(dcProcedure, Stage.FINAL_DISTCP,
- () -> dcProcedure.disableWrite());
+ () -> dcProcedure.disableWrite(context));
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
cleanup(fs, new Path(testRoot));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org