You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2020/06/18 05:34:08 UTC

[hadoop] branch trunk updated: HDFS-15346. FedBalance tool implementation. Contributed by Jinglun.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9cbd76c  HDFS-15346. FedBalance tool implementation. Contributed by Jinglun.
9cbd76c is described below

commit 9cbd76cc775b58dfedb943f971b3307ec5702f13
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Thu Jun 18 13:33:25 2020 +0800

    HDFS-15346. FedBalance tool implementation. Contributed by Jinglun.
---
 .../src/main/resources/assemblies/hadoop-tools.xml |  15 +
 .../apache/hadoop/hdfs/protocol/HdfsConstants.java |   2 +
 hadoop-project/pom.xml                             |  17 +
 hadoop-tools/hadoop-federation-balance/pom.xml     | 249 ++++++++
 .../tools/fedbalance/DistCpBalanceOptions.java     |  95 +++
 .../hadoop/tools/fedbalance/DistCpProcedure.java   | 635 +++++++++++++++++++++
 .../apache/hadoop/tools/fedbalance/FedBalance.java | 377 ++++++++++++
 .../hadoop/tools/fedbalance/FedBalanceConfigs.java |  19 +-
 .../hadoop/tools/fedbalance/FedBalanceContext.java | 286 ++++++++++
 .../tools/fedbalance/MountTableProcedure.java      | 244 ++++++++
 .../hadoop/tools/fedbalance/TrashProcedure.java    | 112 ++++
 .../hadoop/tools/fedbalance}/package-info.java     |  14 +-
 .../tools/fedbalance}/procedure/BalanceJob.java    |   2 +-
 .../fedbalance}/procedure/BalanceJournal.java      |   2 +-
 .../procedure/BalanceJournalInfoHDFS.java          |   8 +-
 .../fedbalance}/procedure/BalanceProcedure.java    |   4 +-
 .../procedure/BalanceProcedureScheduler.java       |   8 +-
 .../tools/fedbalance}/procedure/package-info.java  |   2 +-
 .../shellprofile.d/hadoop-federation-balance.sh    |  38 ++
 .../tools/fedbalance/TestDistCpProcedure.java      | 446 +++++++++++++++
 .../tools/fedbalance/TestMountTableProcedure.java  | 222 +++++++
 .../tools/fedbalance/TestTrashProcedure.java       | 102 ++++
 .../fedbalance}/procedure/MultiPhaseProcedure.java |   2 +-
 .../fedbalance}/procedure/RecordProcedure.java     |   2 +-
 .../fedbalance}/procedure/RetryProcedure.java      |   2 +-
 .../procedure/TestBalanceProcedureScheduler.java   |   7 +-
 .../procedure/UnrecoverableProcedure.java          |   2 +-
 .../tools/fedbalance}/procedure/WaitProcedure.java |   2 +-
 hadoop-tools/hadoop-tools-dist/pom.xml             |   5 +
 hadoop-tools/pom.xml                               |   1 +
 30 files changed, 2887 insertions(+), 35 deletions(-)

diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
index 054d8c0..db744f5 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
@@ -48,6 +48,14 @@
       <fileMode>0755</fileMode>
     </fileSet>
     <fileSet>
+      <directory>../hadoop-federation-balance/src/main/shellprofile.d</directory>
+      <includes>
+        <include>*</include>
+      </includes>
+      <outputDirectory>/libexec/shellprofile.d</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
       <directory>../hadoop-extras/src/main/shellprofile.d</directory>
       <includes>
         <include>*</include>
@@ -112,6 +120,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>../hadoop-federation-balance/target</directory>
+      <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
+      <includes>
+        <include>*-sources.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>../hadoop-extras/target</directory>
       <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
       <includes>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index ab61e50..a025b9b 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -105,6 +105,8 @@ public final class HdfsConstants {
   public static final String DOT_SNAPSHOT_DIR = ".snapshot";
   public static final String SEPARATOR_DOT_SNAPSHOT_DIR
           = Path.SEPARATOR + DOT_SNAPSHOT_DIR;
+  public static final String DOT_SNAPSHOT_DIR_SEPARATOR =
+      DOT_SNAPSHOT_DIR + Path.SEPARATOR;
   public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
       = Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR;
   public final static String DOT_RESERVED_STRING = ".reserved";
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index f3a3d76..48928b5 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -329,6 +329,12 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs-rbf</artifactId>
+        <version>${hadoop.version}</version>
+        <type>test-jar</type>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-app</artifactId>
         <version>${hadoop.version}</version>
       </dependency>
@@ -580,6 +586,17 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-federation-balance</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-federation-balance</artifactId>
+        <version>${hadoop.version}</version>
+        <type>test-jar</type>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-datajoin</artifactId>
         <version>${hadoop.version}</version>
       </dependency>
diff --git a/hadoop-tools/hadoop-federation-balance/pom.xml b/hadoop-tools/hadoop-federation-balance/pom.xml
new file mode 100644
index 0000000..cf79e17
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>3.4.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <artifactId>hadoop-federation-balance</artifactId>
+  <version>3.4.0-SNAPSHOT</version>
+  <description>Apache Hadoop Federation Balance</description>
+  <name>Apache Hadoop Federation Balance</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <file.encoding>UTF-8</file.encoding>
+    <downloadSources>true</downloadSources>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-hs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-rbf</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-rbf</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <testResources>
+      <testResource>
+        <directory>src/test/resources</directory>
+        <filtering>true</filtering>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <testFailureIgnore>${ignoreTestFailure}</testFailureIgnore>
+          <forkCount>1</forkCount>
+          <reuseForks>false</reuseForks>
+          <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+          <argLine>-Xmx1024m</argLine>
+          <includes>
+            <include>**/Test*.java</include>
+          </includes>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <systemProperties>
+            <property>
+              <name>test.build.data</name>
+              <value>${basedir}/target/test/data</value>
+            </property>
+            <property>
+              <name>hadoop.log.dir</name>
+              <value>target/test/logs</value>
+            </property>
+            <property>
+              <name>org.apache.commons.logging.Log</name>
+              <value>org.apache.commons.logging.impl.SimpleLog</value>
+            </property>
+            <property>
+              <name>org.apache.commons.logging.simplelog.defaultlog</name>
+              <value>warn</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+          <execution>
+            <id>deplist</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>list</goal>
+            </goals>
+            <configuration>
+              <!-- referenced by a built-in command -->
+              <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <mainClass>org.apache.hadoop.tools.fedbalance.FedBalance</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <id>prepare-jar</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>prepare-test-jar</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <configuration>
+          <attach>true</attach>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java
new file mode 100644
index 0000000..704ffd9
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+/**
+ * Command line options of FedBalance.
+ */
+public final class DistCpBalanceOptions {
+
+  /**
+   * The private construct protects this class from being instantiated.
+   */
+  private DistCpBalanceOptions() {}
+
+  /**
+   * Run in router-based federation mode.
+   */
+  final static Option ROUTER = new Option("router", false,
+      "If `true` the command runs in router mode. The source path is "
+          + "taken as a mount point. It will disable write by setting the mount"
+          + " point readonly. Otherwise the command works in normal federation"
+          + " mode. The source path is taken as the full path. It will disable"
+          + " write by cancelling all permissions of the source path. The"
+          + " default value is `true`.");
+
+  /**
+   * If true, in DIFF_DISTCP stage it will force close all open files when
+   * there is no diff between the source path and the dst path. Otherwise
+   * the DIFF_DISTCP stage will wait until there is no open files. The
+   * default value is `false`.
+   */
+  final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false,
+      "Force close all open files if the src and dst are synced.");
+
+  /**
+   * Max number of maps to use during copy. DistCp will split work as equally
+   * as possible among these maps.
+   */
+  final static Option MAP =
+      new Option("map", true, "Max number of concurrent maps to use for copy");
+
+  /**
+   * Specify bandwidth per map in MB, accepts bandwidth as a fraction.
+   */
+  final static Option BANDWIDTH =
+      new Option("bandwidth", true, "Specify bandwidth per map in MB.");
+
+  /**
+   * Specify the delayed duration(millie seconds) to retry the Job.
+   */
+  final static Option DELAY_DURATION = new Option("delay", true,
+      "This specifies the delayed duration(millie seconds) when the job"
+          + " needs to retry. A job may retry many times and check the state"
+          + " when it waits for the distcp job to finish.");
+
+  /**
+   * Move the source path to trash after all the data are sync to target, or
+   * delete the source directly, or skip both trash and deletion.
+   */
+  final static Option TRASH = new Option("moveToTrash", true,
+      "Move the source path to trash, or delete the source path directly,"
+          + " or skip both trash and deletion. This accepts 3 values: trash,"
+          + " delete and skip. By default the server side trash interval is"
+          + " used. If the trash is disabled in the server side, the default"
+          + " trash interval 60 minutes is used.");
+
+  final static Options CLI_OPTIONS = new Options();
+
+  static {
+    CLI_OPTIONS.addOption(ROUTER);
+    CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN);
+    CLI_OPTIONS.addOption(MAP);
+    CLI_OPTIONS.addOption(BANDWIDTH);
+    CLI_OPTIONS.addOption(DELAY_DURATION);
+    CLI_OPTIONS.addOption(TRASH);
+  }
+}
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
new file mode 100644
index 0000000..73fecbf
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.OptionsParser;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
+
+/**
+ * Copy data through distcp. Super user privilege needed.
+ *
+ * PRE_CHECK               :pre-check of src and dst.
+ * INIT_DISTCP             :the first round of distcp.
+ * DIFF_DISTCP             :copy snapshot diff round by round until there is
+ *                          no diff.
+ * DISABLE_WRITE           :disable write operations.
+ * FINAL_DISTCP            :close all open files and do the final round distcp.
+ * FINISH                  :procedure finish.
+ */
+public class DistCpProcedure extends BalanceProcedure {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DistCpProcedure.class);
+
+  /* Stages of this procedure. */
+  enum Stage {
+    PRE_CHECK, INIT_DISTCP, DIFF_DISTCP, DISABLE_WRITE, FINAL_DISTCP, FINISH
+  }
+
+  private FedBalanceContext context; // the balance context.
+  private Path src; // the source path including the source cluster.
+  private Path dst; // the dst path including the dst cluster.
+  private Configuration conf;
+  private int mapNum; // the number of map tasks.
+  private int bandWidth; // the bandwidth limit of each distcp task.
+  private String jobId; // the id of the current distcp.
+  private Stage stage; // current stage of this procedure.
+
+  /* Force close all open files when there is no diff between src and dst */
+  private boolean forceCloseOpenFiles;
+  /* Disable write by setting the mount point readonly. */
+  private boolean useMountReadOnly;
+
+  private FsPermission fPerm; // the permission of the src.
+  private AclStatus acl; // the acl of the src.
+
+  private JobClient client;
+  private DistributedFileSystem srcFs; // fs of the src cluster.
+  private DistributedFileSystem dstFs; // fs of the dst cluster.
+
+  /**
+   * Test only. In unit test we use the LocalJobRunner to run the distcp jobs.
+   * Here we save the job to look up the job status. The localJob won't be
+   * serialized thus won't be recovered.
+   */
+  @VisibleForTesting
+  private Job localJob;
+  /**
+   * Enable test mode. Use LocalJobRunner to run the distcp jobs.
+   */
+  @VisibleForTesting
+  static boolean enabledForTest = false;
+
+  public DistCpProcedure() {
+  }
+
+  /**
+   * The constructor of DistCpProcedure.
+   *
+   * @param name the name of the procedure.
+   * @param nextProcedure the name of the next procedure.
+   * @param delayDuration the delay duration when this procedure is delayed.
+   * @param context the federation balance context.
+   */
+  public DistCpProcedure(String name, String nextProcedure, long delayDuration,
+      FedBalanceContext context) throws IOException {
+    super(name, nextProcedure, delayDuration);
+    this.context = context;
+    this.src = context.getSrc();
+    this.dst = context.getDst();
+    this.conf = context.getConf();
+    this.client = new JobClient(conf);
+    this.stage = Stage.PRE_CHECK;
+    this.mapNum = context.getMapNum();
+    this.bandWidth = context.getBandwidthLimit();
+    this.forceCloseOpenFiles = context.getForceCloseOpenFiles();
+    this.useMountReadOnly = context.getUseMountReadOnly();
+    srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+    dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
+  }
+
+  @Override
+  public boolean execute() throws RetryException, IOException {
+    LOG.info("Stage={}", stage.name());
+    switch (stage) {
+    case PRE_CHECK:
+      preCheck();
+      return false;
+    case INIT_DISTCP:
+      initDistCp();
+      return false;
+    case DIFF_DISTCP:
+      diffDistCp();
+      return false;
+    case DISABLE_WRITE:
+      disableWrite();
+      return false;
+    case FINAL_DISTCP:
+      finalDistCp();
+      return false;
+    case FINISH:
+      finish();
+      return true;
+    default:
+      throw new IOException("Unexpected stage=" + stage);
+    }
+  }
+
+  /**
+   * Pre check of src and dst.
+   */
+  void preCheck() throws IOException {
+    FileStatus status = srcFs.getFileStatus(src);
+    if (!status.isDirectory()) {
+      throw new IOException(src + " should be a directory.");
+    }
+    if (dstFs.exists(dst)) {
+      throw new IOException(dst + " already exists.");
+    }
+    if (srcFs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR))) {
+      throw new IOException(src + " shouldn't enable snapshot.");
+    }
+    updateStage(Stage.INIT_DISTCP);
+  }
+
+  /**
+   * The initial distcp. Copying src to dst.
+   */
+  void initDistCp() throws IOException, RetryException {
+    RunningJobStatus job = getCurrentJob();
+    if (job != null) {
+      // the distcp has been submitted.
+      if (job.isComplete()) {
+        jobId = null; // unset jobId because the job is done.
+        if (job.isSuccessful()) {
+          updateStage(Stage.DIFF_DISTCP);
+          return;
+        } else {
+          LOG.warn("DistCp failed. Failure={}", job.getFailureInfo());
+        }
+      } else {
+        throw new RetryException();
+      }
+    } else {
+      pathCheckBeforeInitDistcp();
+      srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
+      jobId = submitDistCpJob(
+          src.toString() + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
+              + CURRENT_SNAPSHOT_NAME, dst.toString(), false);
+    }
+  }
+
+  /**
+   * The distcp copying diffs between LAST_SNAPSHOT_NAME and
+   * CURRENT_SNAPSHOT_NAME.
+   */
+  void diffDistCp() throws IOException, RetryException {
+    RunningJobStatus job = getCurrentJob();
+    if (job != null) {
+      if (job.isComplete()) {
+        jobId = null;
+        if (job.isSuccessful()) {
+          LOG.info("DistCp succeeded. jobId={}", job.getJobID());
+        } else {
+          throw new IOException("DistCp failed. jobId=" + job.getJobID()
+              + " failure=" + job.getFailureInfo());
+        }
+      } else {
+        throw new RetryException(); // wait job complete.
+      }
+    } else if (!verifyDiff()) {
+      if (!verifyOpenFiles() || forceCloseOpenFiles) {
+        updateStage(Stage.DISABLE_WRITE);
+      } else {
+        throw new RetryException();
+      }
+    } else {
+      submitDiffDistCp();
+    }
+  }
+
+  /**
+   * Disable write either by making the mount entry readonly or cancelling the
+   * execute permission of the source path.
+   */
+  void disableWrite() throws IOException {
+    if (useMountReadOnly) {
+      String mount = context.getMount();
+      MountTableProcedure.disableWrite(mount, conf);
+    } else {
+      // Save and cancel permission.
+      FileStatus status = srcFs.getFileStatus(src);
+      fPerm = status.getPermission();
+      acl = srcFs.getAclStatus(src);
+      srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
+    }
+    updateStage(Stage.FINAL_DISTCP);
+  }
+
+  /**
+   * Enable write by restoring the x permission.
+   */
+  void restorePermission() throws IOException {
+    // restore permission.
+    dstFs.removeAcl(dst);
+    if (acl != null) {
+      dstFs.modifyAclEntries(dst, acl.getEntries());
+    }
+    if (fPerm != null) {
+      dstFs.setPermission(dst, fPerm);
+    }
+  }
+
+  /**
+   * Close all open files then submit the distcp with -diff.
+   */
+  void finalDistCp() throws IOException, RetryException {
+    // Close all open files then do the final distcp.
+    closeAllOpenFiles(srcFs, src);
+    // Final distcp.
+    RunningJobStatus job = getCurrentJob();
+    if (job != null) {
+      // the distcp has been submitted.
+      if (job.isComplete()) {
+        jobId = null; // unset jobId because the job is done.
+        if (job.isSuccessful()) {
+          updateStage(Stage.FINISH);
+          return;
+        } else {
+          throw new IOException(
+              "Final DistCp failed. Failure: " + job.getFailureInfo());
+        }
+      } else {
+        throw new RetryException();
+      }
+    } else {
+      submitDiffDistCp();
+    }
+  }
+
+  void finish() throws IOException {
+    if (!useMountReadOnly) {
+      restorePermission();
+    }
+    if (srcFs.exists(src)) {
+      cleanupSnapshot(srcFs, src);
+    }
+    if (dstFs.exists(dst)) {
+      cleanupSnapshot(dstFs, dst);
+    }
+  }
+
+  @VisibleForTesting
+  Stage getStage() {
+    return stage;
+  }
+
+  @VisibleForTesting
+  void updateStage(Stage value) {
+    String oldStage = stage == null ? "null" : stage.name();
+    String newStage = value == null ? "null" : value.name();
+    LOG.info("Stage updated from {} to {}.", oldStage, newStage);
+    stage = value;
+  }
+
+  /**
+   * Submit distcp with -diff option to do the incremental copy.
+   *
+   * |   the source path      |     the dst path     |
+   * | LAST_SNAPSHOT_NAME     |   LAST_SNAPSHOT_NAME |
+   * | CURRENT_SNAPSHOT_NAME  |
+   *
+   * 1. Cleanup all the last snapshots. If there are no last snapshots then do
+   *    nothing.
+   * 2. Create the dst path snapshot named the last snapshot.
+   * 3. Rename the source path current snapshot as the last snapshot. The dst
+   *    path last snapshot and the source path last snapshot are the same now.
+   * 4. Create the current snapshot of the source path.
+   * 5. Submit the distcp job. The incremental part is from the source path last
+   *    snapshot to the source path current snapshot.
+   */
+  private void submitDiffDistCp() throws IOException {
+    enableSnapshot(dstFs, dst);
+    deleteSnapshot(srcFs, src, LAST_SNAPSHOT_NAME);
+    deleteSnapshot(dstFs, dst, LAST_SNAPSHOT_NAME);
+    dstFs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
+    srcFs.renameSnapshot(src, CURRENT_SNAPSHOT_NAME, LAST_SNAPSHOT_NAME);
+    srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
+    jobId = submitDistCpJob(src.toString(), dst.toString(), true);
+  }
+
+  /**
+   * Close all open files. Block until all the files are closed.
+   */
+  private void closeAllOpenFiles(DistributedFileSystem dfs, Path path)
+      throws IOException {
+    String pathStr = path.toUri().getPath();
+    while (true) {
+      RemoteIterator<OpenFileEntry> iterator =
+          dfs.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), pathStr);
+      if (!iterator.hasNext()) { // all files has been closed.
+        break;
+      }
+      while (iterator.hasNext()) {
+        OpenFileEntry e = iterator.next();
+        try {
+          srcFs.recoverLease(new Path(e.getFilePath()));
+        } catch (IOException re) {
+          // ignore recoverLease error.
+        }
+      }
+    }
+  }
+
+  /**
+   * Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot.
+   *
+   * @return true if the src has changed.
+   */
+  private boolean verifyDiff() throws IOException {
+    SnapshotDiffReport diffReport =
+        srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, "");
+    return diffReport.getDiffList().size() > 0;
+  }
+
+  /**
+   * Verify whether there is any open files under src.
+   *
+   * @return true if there are open files.
+   */
+  private boolean verifyOpenFiles() throws IOException {
+    RemoteIterator<OpenFileEntry> iterator = srcFs
+        .listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
+            src.toString());
+    return iterator.hasNext();
+  }
+
+  private RunningJobStatus getCurrentJob() throws IOException {
+    if (jobId != null) {
+      if (enabledForTest) {
+        return getCurrentLocalJob();
+      } else {
+        RunningJob latestJob = client.getJob(JobID.forName(jobId));
+        return latestJob == null ? null : new YarnRunningJobStatus(latestJob);
+      }
+    }
+    return null;
+  }
+
+  private LocalJobStatus getCurrentLocalJob() throws IOException {
+    if (localJob != null) {
+      Job latestJob;
+      try {
+        latestJob = localJob.getCluster().getJob(JobID.forName(jobId));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      return latestJob == null ? null : new LocalJobStatus(latestJob);
+    } else {
+      return null;
+    }
+  }
+
+  private void pathCheckBeforeInitDistcp() throws IOException {
+    if (dstFs.exists(dst)) { // clean up.
+      throw new IOException("The dst path=" + dst + " already exists. The admin"
+          + " should delete it before submitting the initial distcp job.");
+    }
+    Path snapshotPath = new Path(src,
+        HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + CURRENT_SNAPSHOT_NAME);
+    if (srcFs.exists(snapshotPath)) {
+      throw new IOException("The src snapshot=" + snapshotPath +
+          " already exists. The admin should delete the snapshot before"
+          + " submitting the initial distcp.");
+    }
+    srcFs.allowSnapshot(src);
+  }
+
+  /**
+   * Submit distcp job and return jobId.
+   */
+  private String submitDistCpJob(String srcParam, String dstParam,
+      boolean useSnapshotDiff) throws IOException {
+    List<String> command = new ArrayList<>();
+    command.addAll(Arrays
+        .asList(new String[] {"-async", "-update", "-append", "-pruxgpcab"}));
+    if (useSnapshotDiff) {
+      command.add("-diff");
+      command.add(LAST_SNAPSHOT_NAME);
+      command.add(CURRENT_SNAPSHOT_NAME);
+    }
+    command.add("-m");
+    command.add(mapNum + "");
+    command.add("-bandwidth");
+    command.add(bandWidth + "");
+    command.add(srcParam);
+    command.add(dstParam);
+
+    Configuration config = new Configuration(conf);
+    DistCp distCp;
+    try {
+      distCp = new DistCp(config,
+          OptionsParser.parse(command.toArray(new String[]{})));
+      Job job = distCp.createAndSubmitJob();
+      LOG.info("Submit distcp job={}", job);
+      if (enabledForTest) {
+        localJob = job;
+      }
+      return job.getJobID().toString();
+    } catch (Exception e) {
+      throw new IOException("Submit job failed.", e);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    context.write(out);
+    if (jobId == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      Text.writeString(out, jobId);
+    }
+    out.writeInt(stage.ordinal());
+    if (fPerm == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeShort(fPerm.toShort());
+    }
+    if (acl == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      ByteArrayOutputStream bout = new ByteArrayOutputStream();
+      PBHelperClient.convert(acl).writeDelimitedTo(bout);
+      byte[] data = bout.toByteArray();
+      out.writeInt(data.length);
+      out.write(data);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    context = new FedBalanceContext();
+    context.readFields(in);
+    src = context.getSrc();
+    dst = context.getDst();
+    conf = context.getConf();
+    if (in.readBoolean()) {
+      jobId = Text.readString(in);
+    }
+    stage = Stage.values()[in.readInt()];
+    if (in.readBoolean()) {
+      fPerm = FsPermission.read(in);
+    }
+    if (in.readBoolean()) {
+      int len = in.readInt();
+      byte[] data = new byte[len];
+      in.readFully(data);
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      AclProtos.GetAclStatusResponseProto proto =
+          AclProtos.GetAclStatusResponseProto.parseDelimitedFrom(bin);
+      acl = PBHelperClient.convert(proto);
+    }
+    srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+    dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
+    mapNum = context.getMapNum();
+    bandWidth = context.getBandwidthLimit();
+    forceCloseOpenFiles = context.getForceCloseOpenFiles();
+    useMountReadOnly = context.getUseMountReadOnly();
+    this.client = new JobClient(conf);
+  }
+
+  private static void enableSnapshot(DistributedFileSystem dfs, Path path)
+      throws IOException {
+    if (!dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) {
+      dfs.allowSnapshot(path);
+    }
+  }
+
+  static void deleteSnapshot(DistributedFileSystem dfs, Path path,
+      String snapshotName) throws IOException {
+    Path snapshot =
+        new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + snapshotName);
+    if (dfs.exists(snapshot)) {
+      dfs.deleteSnapshot(path, snapshotName);
+    }
+  }
+
+  static void cleanupSnapshot(DistributedFileSystem dfs, Path path)
+      throws IOException {
+    if (dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) {
+      FileStatus[] status =
+          dfs.listStatus(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR));
+      for (FileStatus s : status) {
+        deleteSnapshot(dfs, path, s.getPath().getName());
+      }
+      dfs.disallowSnapshot(path);
+    }
+  }
+
+  interface RunningJobStatus {
+    String getJobID();
+
+    boolean isComplete() throws IOException;
+
+    boolean isSuccessful() throws IOException;
+
+    String getFailureInfo() throws IOException;
+  }
+
+  private static class YarnRunningJobStatus implements RunningJobStatus {
+
+    private final RunningJob job;
+
+    YarnRunningJobStatus(RunningJob job) {
+      this.job = job;
+    }
+
+    @Override
+    public String getJobID() {
+      return job.getID().toString();
+    }
+
+    @Override
+    public boolean isComplete() throws IOException {
+      return job.isComplete();
+    }
+
+    @Override
+    public boolean isSuccessful() throws IOException {
+      return job.isSuccessful();
+    }
+
+    @Override
+    public String getFailureInfo() throws IOException {
+      return job.getFailureInfo();
+    }
+  }
+
+  private static class LocalJobStatus implements RunningJobStatus {
+
+    private final Job testJob;
+
+    LocalJobStatus(Job testJob) {
+      this.testJob = testJob;
+    }
+
+    @Override
+    public String getJobID() {
+      return testJob.getJobID().toString();
+    }
+
+    @Override
+    public boolean isComplete() throws IOException {
+      return testJob.isComplete();
+    }
+
+    @Override
+    public boolean isSuccessful() throws IOException {
+      return testJob.isSuccessful();
+    }
+
+    @Override
+    public String getFailureInfo() throws IOException {
+      try {
+        return testJob.getStatus().getFailureInfo();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
new file mode 100644
index 0000000..adfb40b
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.ROUTER;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.FORCE_CLOSE_OPEN;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.MAP;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.BANDWIDTH;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.TRASH;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.DELAY_DURATION;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.CLI_OPTIONS;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.FEDERATION_BALANCE_CLASS;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+
+/**
+ * Balance data from src cluster to dst cluster with distcp.
+ *
+ * 1. Move data from the source path to the destination path with distcp.
+ * 2. Update the the mount entry.
+ * 3. Delete the source path to trash.
+ */
+public class FedBalance extends Configured implements Tool {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FedBalance.class);
+  private static final String SUBMIT_COMMAND = "submit";
+  private static final String CONTINUE_COMMAND = "continue";
+  private static final String NO_MOUNT = "no-mount";
+  private static final String DISTCP_PROCEDURE = "distcp-procedure";
+  private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
+  private static final String TRASH_PROCEDURE = "trash-procedure";
+
+  /**
+   * This class helps building the balance job.
+   */
+  private class Builder {
+    /* Balancing in an rbf cluster. */
+    private boolean routerCluster = false;
+    /* Force close all open files while there is no diff. */
+    private boolean forceCloseOpen = false;
+    /* Max number of concurrent maps to use for copy. */
+    private int map = 10;
+    /* Specify bandwidth per map in MB. */
+    private int bandwidth = 10;
+    /* Specify the trash behaviour of the source path. */
+    private TrashOption trashOpt = TrashOption.TRASH;
+    /* Specify the duration(millie seconds) when the procedure needs retry. */
+    private long delayDuration = TimeUnit.SECONDS.toMillis(1);
+    /* The source input. This specifies the source path. */
+    private final String inputSrc;
+    /* The dst input. This specifies the dst path. */
+    private final String inputDst;
+
+    Builder(String inputSrc, String inputDst) {
+      this.inputSrc = inputSrc;
+      this.inputDst = inputDst;
+    }
+
+    /**
+     * Whether balancing in an rbf cluster.
+     * @param value true if it's running in a router-based federation cluster.
+     */
+    public Builder setRouterCluster(boolean value) {
+      this.routerCluster = value;
+      return this;
+    }
+
+    /**
+     * Whether force close all open files while there is no diff.
+     * @param value true if force close all the open files.
+     */
+    public Builder setForceCloseOpen(boolean value) {
+      this.forceCloseOpen = value;
+      return this;
+    }
+
+    /**
+     * Max number of concurrent maps to use for copy.
+     * @param value the map number of the distcp.
+     */
+    public Builder setMap(int value) {
+      this.map = value;
+      return this;
+    }
+
+    /**
+     * Specify bandwidth per map in MB.
+     * @param value the bandwidth.
+     */
+    public Builder setBandWidth(int value) {
+      this.bandwidth = value;
+      return this;
+    }
+
+    /**
+     * Specify the trash behaviour of the source path.
+     * @param value the trash option.
+     */
+    public Builder setTrashOpt(TrashOption value) {
+      this.trashOpt = value;
+      return this;
+    }
+
+    /**
+     * Specify the duration(millie seconds) when the procedure needs retry.
+     * @param value the delay duration of the job.
+     */
+    public Builder setDelayDuration(long value) {
+      this.delayDuration = value;
+      return this;
+    }
+
+    /**
+     * Build the balance job.
+     */
+    public BalanceJob build() throws IOException {
+      // Construct job context.
+      FedBalanceContext context;
+      Path dst = new Path(inputDst);
+      if (dst.toUri().getAuthority() == null) {
+        throw new IOException("The destination cluster must be specified.");
+      }
+      if (routerCluster) { // router-based federation.
+        Path src = getSrcPath(inputSrc);
+        String mount = inputSrc;
+        context = new FedBalanceContext.Builder(src, dst, mount, getConf())
+            .setForceCloseOpenFiles(forceCloseOpen)
+            .setUseMountReadOnly(routerCluster).setMapNum(map)
+            .setBandwidthLimit(bandwidth).setTrash(trashOpt)
+            .setDelayDuration(delayDuration).build();
+      } else { // normal federation cluster.
+        Path src = new Path(inputSrc);
+        if (src.toUri().getAuthority() == null) {
+          throw new IOException("The source cluster must be specified.");
+        }
+        context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
+            .setForceCloseOpenFiles(forceCloseOpen)
+            .setUseMountReadOnly(routerCluster).setMapNum(map)
+            .setBandwidthLimit(bandwidth).setTrash(trashOpt).build();
+      }
+
+      LOG.info(context.toString());
+      // Construct the balance job.
+      BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
+      DistCpProcedure dcp =
+          new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context);
+      builder.nextProcedure(dcp);
+      if (routerCluster) {
+        MountTableProcedure mtp =
+            new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
+                inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
+                getConf());
+        builder.nextProcedure(mtp);
+      }
+      TrashProcedure tp =
+          new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
+      builder.nextProcedure(tp);
+      return builder.build();
+    }
+  }
+
+  public FedBalance() {
+    super();
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    CommandLineParser parser = new GnuParser();
+    CommandLine command =
+        parser.parse(DistCpBalanceOptions.CLI_OPTIONS, args, true);
+    String[] leftOverArgs = command.getArgs();
+    if (leftOverArgs == null || leftOverArgs.length < 1) {
+      printUsage();
+      return -1;
+    }
+    String cmd = leftOverArgs[0];
+    if (cmd.equals(SUBMIT_COMMAND)) {
+      if (leftOverArgs.length < 3) {
+        printUsage();
+        return -1;
+      }
+      String inputSrc = leftOverArgs[1];
+      String inputDst = leftOverArgs[2];
+      return submit(command, inputSrc, inputDst);
+    } else if (cmd.equals(CONTINUE_COMMAND)) {
+      return continueJob();
+    } else {
+      printUsage();
+      return -1;
+    }
+  }
+
+  /**
+   * Recover and continue the unfinished jobs.
+   */
+  private int continueJob() throws InterruptedException {
+    BalanceProcedureScheduler scheduler =
+        new BalanceProcedureScheduler(getConf());
+    try {
+      scheduler.init(true);
+      while (true) {
+        Collection<BalanceJob> jobs = scheduler.getAllJobs();
+        int unfinished = 0;
+        for (BalanceJob job : jobs) {
+          if (!job.isJobDone()) {
+            unfinished++;
+          }
+          LOG.info(job.toString());
+        }
+        if (unfinished == 0) {
+          break;
+        }
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+      }
+    } catch (IOException e) {
+      LOG.error("Continue balance job failed.", e);
+      return -1;
+    } finally {
+      scheduler.shutDown();
+    }
+    return 0;
+  }
+
+  /**
+   * Start a ProcedureScheduler and submit the job.
+   *
+   * @param command the command options.
+   * @param inputSrc the source input. This specifies the source path.
+   * @param inputDst the dst input. This specifies the dst path.
+   */
+  private int submit(CommandLine command, String inputSrc, String inputDst)
+      throws IOException {
+    Builder builder = new Builder(inputSrc, inputDst);
+    // parse options.
+    builder.setRouterCluster(command.hasOption(ROUTER.getOpt()));
+    builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
+    if (command.hasOption(MAP.getOpt())) {
+      builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
+    }
+    if (command.hasOption(BANDWIDTH.getOpt())) {
+      builder.setBandWidth(
+          Integer.parseInt(command.getOptionValue(BANDWIDTH.getOpt())));
+    }
+    if (command.hasOption(DELAY_DURATION.getOpt())) {
+      builder.setDelayDuration(
+          Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt())));
+    }
+    if (command.hasOption(TRASH.getOpt())) {
+      String val = command.getOptionValue(TRASH.getOpt());
+      if (val.equalsIgnoreCase("skip")) {
+        builder.setTrashOpt(TrashOption.SKIP);
+      } else if (val.equalsIgnoreCase("trash")) {
+        builder.setTrashOpt(TrashOption.TRASH);
+      } else if (val.equalsIgnoreCase("delete")) {
+        builder.setTrashOpt(TrashOption.DELETE);
+      } else {
+        printUsage();
+        return -1;
+      }
+    }
+
+    // Submit the job.
+    BalanceProcedureScheduler scheduler =
+        new BalanceProcedureScheduler(getConf());
+    scheduler.init(false);
+    try {
+      BalanceJob balanceJob = builder.build();
+      // Submit and wait until the job is done.
+      scheduler.submit(balanceJob);
+      scheduler.waitUntilDone(balanceJob);
+    } catch (IOException e) {
+      LOG.error("Submit balance job failed.", e);
+      return -1;
+    } finally {
+      scheduler.shutDown();
+    }
+    return 0;
+  }
+
+  /**
+   * Get src uri from Router.
+   */
+  private Path getSrcPath(String fedPath) throws IOException {
+    String address = getConf().getTrimmed(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+    InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+    RouterClient rClient = new RouterClient(routerSocket, getConf());
+    try {
+      MountTableManager mountTable = rClient.getMountTableManager();
+      MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable);
+      if (entry == null) {
+        throw new IllegalArgumentException(
+            "The mount point doesn't exist. path=" + fedPath);
+      } else if (entry.getDestinations().size() > 1) {
+        throw new IllegalArgumentException(
+            "The mount point has more than one destination. path=" + fedPath);
+      } else {
+        String ns = entry.getDestinations().get(0).getNameserviceId();
+        String path = entry.getDestinations().get(0).getDest();
+        return new Path("hdfs://" + ns + path);
+      }
+    } finally {
+      rClient.close();
+    }
+  }
+
+  private void printUsage() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp(
+        "fedbalance OPTIONS [submit|continue] <src> <target>\n\nOPTIONS",
+        CLI_OPTIONS);
+  }
+
+  /**
+   * Main function of the FedBalance program. Parses the input arguments and
+   * invokes the FedBalance::run() method, via the ToolRunner.
+   * @param argv Command-line arguments sent to FedBalance.
+   */
+  public static void main(String[] argv) {
+    Configuration conf = new HdfsConfiguration();
+    Class<Tool> balanceClazz = (Class<Tool>) conf
+        .getClass(FEDERATION_BALANCE_CLASS, FedBalance.class);
+    Tool balancer = ReflectionUtils.newInstance(balanceClazz, conf);
+    int exitCode;
+    try {
+      exitCode = ToolRunner.run(balancer, argv);
+    } catch (Exception e) {
+      LOG.warn("Couldn't complete FedBalance operation.", e);
+      exitCode = -1;
+    }
+    System.exit(exitCode);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java
similarity index 72%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java
index f869035..952aef2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java
@@ -15,16 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
- * This class contains constants for configuration keys and default values
- * used in hdfs procedure.
+ * Federation balance configuration properties.
  */
 @InterfaceAudience.Private
-public final class BalanceProcedureConfigKeys {
+public final class FedBalanceConfigs {
+  /* The class used for federation balance */
+  public static final String FEDERATION_BALANCE_CLASS =
+      "federation.balance.class";
+  public static final String LAST_SNAPSHOT_NAME = "DISTCP-BALANCE-CURRENT";
+  public static final String CURRENT_SNAPSHOT_NAME = "DISTCP-BALANCE-NEXT";
+  /* Specify the behaviour of trash. */
+  public enum TrashOption {
+    TRASH, DELETE, SKIP
+  }
+
   /* The worker threads number of the BalanceProcedureScheduler */
   public static final String WORK_THREAD_NUM =
       "hadoop.hdfs.procedure.work.thread.num";
@@ -37,5 +46,5 @@ public final class BalanceProcedureConfigKeys {
   public static final String JOURNAL_CLASS =
       "hadoop.hdfs.procedure.journal.class";
 
-  private BalanceProcedureConfigKeys() {}
+  private FedBalanceConfigs(){}
 }
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
new file mode 100644
index 0000000..56be7db
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+
+/**
+ * This class contains the basic information needed when Federation Balance.
+ */
+public class FedBalanceContext implements Writable {
+
+  /* the source path in the source sub-cluster */
+  private Path src;
+  /* the target path in the target sub-cluster */
+  private Path dst;
+  /* the mount point to be balanced */
+  private String mount;
+  /* Force close all open files when there is no diff between src and dst */
+  private boolean forceCloseOpenFiles;
+  /* Disable write by setting the mount point readonly. */
+  private boolean useMountReadOnly;
+  /* The map number of the distcp job. */
+  private int mapNum;
+  /* The bandwidth limit of the distcp job(MB). */
+  private int bandwidthLimit;
+  /* Move source path to trash after all the data are sync to target. Otherwise
+     delete the source directly. */
+  private TrashOption trashOpt;
+  /* How long will the procedures be delayed. */
+  private long delayDuration;
+
+  private Configuration conf;
+
+  public FedBalanceContext() {}
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public Path getSrc() {
+    return src;
+  }
+
+  public Path getDst() {
+    return dst;
+  }
+
+  public String getMount() {
+    return mount;
+  }
+
+  public boolean getForceCloseOpenFiles() {
+    return forceCloseOpenFiles;
+  }
+
+  public boolean getUseMountReadOnly() {
+    return useMountReadOnly;
+  }
+
+  public int getMapNum() {
+    return mapNum;
+  }
+
+  public int getBandwidthLimit() {
+    return bandwidthLimit;
+  }
+
+  public TrashOption getTrashOpt() {
+    return trashOpt;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    conf.write(out);
+    Text.writeString(out, src.toString());
+    Text.writeString(out, dst.toString());
+    Text.writeString(out, mount);
+    out.writeBoolean(forceCloseOpenFiles);
+    out.writeBoolean(useMountReadOnly);
+    out.writeInt(mapNum);
+    out.writeInt(bandwidthLimit);
+    out.writeInt(trashOpt.ordinal());
+    out.writeLong(delayDuration);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    conf = new Configuration(false);
+    conf.readFields(in);
+    src = new Path(Text.readString(in));
+    dst = new Path(Text.readString(in));
+    mount = Text.readString(in);
+    forceCloseOpenFiles = in.readBoolean();
+    useMountReadOnly = in.readBoolean();
+    mapNum = in.readInt();
+    bandwidthLimit = in.readInt();
+    trashOpt = TrashOption.values()[in.readInt()];
+    delayDuration = in.readLong();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj == this) {
+      return true;
+    }
+    if (obj.getClass() != getClass()) {
+      return false;
+    }
+    FedBalanceContext bc = (FedBalanceContext) obj;
+    return new EqualsBuilder()
+        .append(src, bc.src)
+        .append(dst, bc.dst)
+        .append(mount, bc.mount)
+        .append(forceCloseOpenFiles, bc.forceCloseOpenFiles)
+        .append(useMountReadOnly, bc.useMountReadOnly)
+        .append(mapNum, bc.mapNum)
+        .append(bandwidthLimit, bc.bandwidthLimit)
+        .append(trashOpt, bc.trashOpt)
+        .append(delayDuration, bc.delayDuration)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+        .append(src)
+        .append(dst)
+        .append(mount)
+        .append(forceCloseOpenFiles)
+        .append(useMountReadOnly)
+        .append(mapNum)
+        .append(bandwidthLimit)
+        .append(trashOpt)
+        .append(delayDuration)
+        .build();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("FedBalance context:");
+    builder.append(" src=").append(src);
+    builder.append(", dst=").append(dst);
+    if (useMountReadOnly) {
+      builder.append(", router-mode=true");
+      builder.append(", mount-point=").append(mount);
+    } else {
+      builder.append(", router-mode=false");
+    }
+    builder.append(", forceCloseOpenFiles=").append(forceCloseOpenFiles);
+    builder.append(", trash=").append(trashOpt.name());
+    builder.append(", map=").append(mapNum);
+    builder.append(", bandwidth=").append(bandwidthLimit);
+    builder.append(", delayDuration=").append(delayDuration);
+    return builder.toString();
+  }
+
+  static class Builder {
+    private final Path src;
+    private final Path dst;
+    private final String mount;
+    private final Configuration conf;
+    private boolean forceCloseOpenFiles = false;
+    private boolean useMountReadOnly = false;
+    private int mapNum;
+    private int bandwidthLimit;
+    private TrashOption trashOpt;
+    private long delayDuration;
+
+    /**
+     * This class helps building the FedBalanceContext.
+     *
+     * @param src the source path in the source sub-cluster.
+     * @param dst the target path in the target sub-cluster.
+     * @param mount the mount point to be balanced.
+     * @param conf the configuration.
+     */
+    Builder(Path src, Path dst, String mount, Configuration conf) {
+      this.src = src;
+      this.dst = dst;
+      this.mount = mount;
+      this.conf = conf;
+    }
+
+    /**
+     * Force close open files.
+     * @param value true if force close all the open files.
+     */
+    public Builder setForceCloseOpenFiles(boolean value) {
+      this.forceCloseOpenFiles = value;
+      return this;
+    }
+
+    /**
+     * Use mount point readonly to disable write.
+     * @param value true if disabling write by setting mount point readonly.
+     */
+    public Builder setUseMountReadOnly(boolean value) {
+      this.useMountReadOnly = value;
+      return this;
+    }
+
+    /**
+     * The map number of the distcp job.
+     * @param value the map number of the distcp.
+     */
+    public Builder setMapNum(int value) {
+      this.mapNum = value;
+      return this;
+    }
+
+    /**
+     * The bandwidth limit of the distcp job(MB).
+     * @param value the bandwidth.
+     */
+    public Builder setBandwidthLimit(int value) {
+      this.bandwidthLimit = value;
+      return this;
+    }
+
+    /**
+     * Specify the trash behaviour after all the data is sync to the target.
+     * @param value the trash option.
+     * */
+    public Builder setTrash(TrashOption value) {
+      this.trashOpt = value;
+      return this;
+    }
+
+    /**
+     * Specify the delayed duration when the procedures need to retry.
+     */
+    public Builder setDelayDuration(long value) {
+      this.delayDuration = value;
+      return this;
+    }
+
+    /**
+     * Build the FedBalanceContext.
+     *
+     * @return the FedBalanceContext obj.
+     */
+    public FedBalanceContext build() {
+      FedBalanceContext context = new FedBalanceContext();
+      context.src = this.src;
+      context.dst = this.dst;
+      context.mount = this.mount;
+      context.conf = this.conf;
+      context.forceCloseOpenFiles = this.forceCloseOpenFiles;
+      context.useMountReadOnly = this.useMountReadOnly;
+      context.mapNum = this.mapNum;
+      context.bandwidthLimit = this.bandwidthLimit;
+      context.trashOpt = this.trashOpt;
+      context.delayDuration = this.delayDuration;
+      return context;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java
new file mode 100644
index 0000000..8f78983
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Update mount table.
+ * Old mount table:
+ *   /a/b/c -> {ns:src path:/a/b/c}
+ * New mount table:
+ *   /a/b/c -> {ns:dst path:/a/b/c}
+ */
+public class MountTableProcedure extends BalanceProcedure {
+
+  private String mount;
+  private String dstPath;
+  private String dstNs;
+  private Configuration conf;
+
+  public MountTableProcedure() {}
+
+  /**
+   * Update mount entry to specified dst uri.
+   *
+   * @param mount the mount entry to be updated.
+   * @param dstPath the sub-cluster uri of the dst path.
+   * @param conf the configuration.
+   */
+  public MountTableProcedure(String name, String nextProcedure,
+      long delayDuration, String mount, String dstPath, String dstNs,
+      Configuration conf) throws IOException {
+    super(name, nextProcedure, delayDuration);
+    this.mount = mount;
+    this.dstPath = dstPath;
+    this.dstNs = dstNs;
+    this.conf = conf;
+  }
+
+  @Override
+  public boolean execute() throws RetryException, IOException {
+    updateMountTable();
+    return true;
+  }
+
+  private void updateMountTable() throws IOException {
+    updateMountTableDestination(mount, dstNs, dstPath, conf);
+    enableWrite(mount, conf);
+  }
+
+  /**
+   * Update the destination of the mount point to target namespace and target
+   * path.
+   *
+   * @param mount   the mount point.
+   * @param dstNs   the target namespace.
+   * @param dstPath the target path
+   * @param conf    the configuration of the router.
+   */
+  private static void updateMountTableDestination(String mount, String dstNs,
+      String dstPath, Configuration conf) throws IOException {
+    String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+    InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+    RouterClient rClient = new RouterClient(routerSocket, conf);
+    try {
+      MountTableManager mountTable = rClient.getMountTableManager();
+
+      MountTable originalEntry = getMountEntry(mount, mountTable);
+      if (originalEntry == null) {
+        throw new IOException("Mount table " + mount + " doesn't exist");
+      } else {
+        RemoteLocation remoteLocation =
+            new RemoteLocation(dstNs, dstPath, mount);
+        originalEntry.setDestinations(Arrays.asList(remoteLocation));
+        UpdateMountTableEntryRequest updateRequest =
+            UpdateMountTableEntryRequest.newInstance(originalEntry);
+        UpdateMountTableEntryResponse response =
+            mountTable.updateMountTableEntry(updateRequest);
+        if (!response.getStatus()) {
+          throw new IOException("Failed update mount table " + mount);
+        }
+        rClient.getMountTableManager().refreshMountTableEntries(
+            RefreshMountTableEntriesRequest.newInstance());
+      }
+    } finally {
+      rClient.close();
+    }
+  }
+
+  /**
+   * Gets the mount table entry.
+   * @param mount name of the mount entry.
+   * @param mountTable the mount table.
+   * @return corresponding mount entry.
+   * @throws IOException in case of failure to retrieve mount entry.
+   */
+  public static MountTable getMountEntry(String mount,
+      MountTableManager mountTable)
+      throws IOException {
+    GetMountTableEntriesRequest getRequest =
+        GetMountTableEntriesRequest.newInstance(mount);
+    GetMountTableEntriesResponse getResponse =
+        mountTable.getMountTableEntries(getRequest);
+    List<MountTable> results = getResponse.getEntries();
+    MountTable existingEntry = null;
+    for (MountTable result : results) {
+      if (mount.equals(result.getSourcePath())) {
+        existingEntry = result;
+        break;
+      }
+    }
+    return existingEntry;
+  }
+
+  /**
+   * Disable write by making the mount point readonly.
+   *
+   * @param mount the mount point to set readonly.
+   * @param conf  the configuration of the router.
+   */
+  static void disableWrite(String mount, Configuration conf)
+      throws IOException {
+    setMountReadOnly(mount, true, conf);
+  }
+
+  /**
+   * Enable write by cancelling the mount point readonly.
+   *
+   * @param mount the mount point to cancel readonly.
+   * @param conf  the configuration of the router.
+   */
+  static void enableWrite(String mount, Configuration conf) throws IOException {
+    setMountReadOnly(mount, false, conf);
+  }
+
+  /**
+   * Enable or disable readonly of the mount point.
+   *
+   * @param mount    the mount point.
+   * @param readOnly enable or disable readonly.
+   * @param conf     the configuration of the router.
+   */
+  private static void setMountReadOnly(String mount, boolean readOnly,
+      Configuration conf) throws IOException {
+    String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+    InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+    RouterClient rClient = new RouterClient(routerSocket, conf);
+    try {
+      MountTableManager mountTable = rClient.getMountTableManager();
+
+      MountTable originalEntry = getMountEntry(mount, mountTable);
+      if (originalEntry == null) {
+        throw new IOException("Mount table " + mount + " doesn't exist");
+      } else {
+        originalEntry.setReadOnly(readOnly);
+        UpdateMountTableEntryRequest updateRequest =
+            UpdateMountTableEntryRequest.newInstance(originalEntry);
+        UpdateMountTableEntryResponse response =
+            mountTable.updateMountTableEntry(updateRequest);
+        if (!response.getStatus()) {
+          throw new IOException(
+              "Failed update mount table " + mount + " with readonly="
+                  + readOnly);
+        }
+        rClient.getMountTableManager().refreshMountTableEntries(
+            RefreshMountTableEntriesRequest.newInstance());
+      }
+    } finally {
+      rClient.close();
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, mount);
+    Text.writeString(out, dstPath);
+    Text.writeString(out, dstNs);
+    conf.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    mount = Text.readString(in);
+    dstPath = Text.readString(in);
+    dstNs = Text.readString(in);
+    conf = new Configuration(false);
+    conf.readFields(in);
+  }
+
+  @VisibleForTesting
+  String getMount() {
+    return mount;
+  }
+
+  @VisibleForTesting
+  String getDstPath() {
+    return dstPath;
+  }
+
+  @VisibleForTesting
+  String getDstNs() {
+    return dstNs;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java
new file mode 100644
index 0000000..94ae616
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+
+/**
+ * This procedure moves the source path to the corresponding trash.
+ */
+public class TrashProcedure extends BalanceProcedure {
+
+  private DistributedFileSystem srcFs;
+  private FedBalanceContext context;
+  private Configuration conf;
+
+  public TrashProcedure() {}
+
+  /**
+   * The constructor of TrashProcedure.
+   *
+   * @param name the name of the procedure.
+   * @param nextProcedure the name of the next procedure.
+   * @param delayDuration the delay duration when this procedure is delayed.
+   * @param context the federation balance context.
+   */
+  public TrashProcedure(String name, String nextProcedure, long delayDuration,
+      FedBalanceContext context) throws IOException {
+    super(name, nextProcedure, delayDuration);
+    this.context = context;
+    this.conf = context.getConf();
+    this.srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    moveToTrash();
+    return true;
+  }
+
+  /**
+   * Delete source path to trash.
+   */
+  void moveToTrash() throws IOException {
+    Path src = context.getSrc();
+    if (srcFs.exists(src)) {
+      TrashOption trashOption = context.getTrashOpt();
+      switch (trashOption) {
+      case TRASH:
+        conf.setFloat(FS_TRASH_INTERVAL_KEY, 60);
+        if (!Trash.moveToAppropriateTrash(srcFs, src, conf)) {
+          throw new IOException("Failed move " + src + " to trash.");
+        }
+        break;
+      case DELETE:
+        if (!srcFs.delete(src, true)) {
+          throw new IOException("Failed delete " + src);
+        }
+        LOG.info("{} is deleted.", src);
+        break;
+      case SKIP:
+        break;
+      default:
+        throw new IOException("Unexpected trash option=" + trashOption);
+      }
+    }
+  }
+
+  public FedBalanceContext getContext() {
+    return context;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    context.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    context = new FedBalanceContext();
+    context.readFields(in);
+    conf = context.getConf();
+    srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java
similarity index 72%
copy from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
copy to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java
index 626d3b3..3007402 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,10 @@
  * limitations under the License.
  */
 
+
 /**
- * Classes under this package implement a state machine used for balancing data
- * across federation namespaces.
+ * FedBalance is a tool for balancing data across federation clusters.
  */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-
-package org.apache.hadoop.hdfs.procedure;
-
+@InterfaceAudience.Public
+package org.apache.hadoop.tools.fedbalance;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java
similarity index 99%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java
index 847092a..8d5f9d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.builder.EqualsBuilder;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java
index 011ae85..da8eb74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.hadoop.conf.Configurable;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java
similarity index 95%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java
index 4e759d8..0da8c36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,9 +37,9 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.TMP_TAIL;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOB_PREFIX;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TMP_TAIL;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOB_PREFIX;
 
 /**
  * BalanceJournal based on HDFS. This class stores all the journals in the HDFS.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java
index 6320e8f..080a7375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -29,7 +29,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import static org.apache.hadoop.hdfs.procedure.BalanceJob.NEXT_PROCEDURE_NONE;
+import static org.apache.hadoop.tools.fedbalance.procedure.BalanceJob.NEXT_PROCEDURE_NONE;
 
 /**
  * The basic components of the Job. Extend this class to implement different
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
index 74606c5..0f82b88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -40,9 +40,9 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM_DEFAULT;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOURNAL_CLASS;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM_DEFAULT;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOURNAL_CLASS;
 /**
  * The state machine framework consist of:
  *   Job:                The state machine. It implements the basic logic of the
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java
similarity index 95%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java
index 626d3b3..cb03d13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java
@@ -23,7 +23,7 @@
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh b/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh
new file mode 100644
index 0000000..2872c7a
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if ! declare -f hadoop_subcommand_fedbalance >/dev/null 2>/dev/null; then
+
+  if [[ "${HADOOP_SHELL_EXECNAME}" = hadoop ]]; then
+    hadoop_add_subcommand "fedbalance" client "balance data between sub-clusters"
+  fi
+
+  # this can't be indented otherwise shelldocs won't get it
+
+## @description  fedbalance command for hadoop
+## @audience     public
+## @stability    stable
+## @replaceable  yes
+function hadoop_subcommand_fedbalance
+{
+  # shellcheck disable=SC2034
+  HADOOP_CLASSNAME=org.apache.hadoop.tools.fedbalance.FedBalance
+  hadoop_add_to_classpath_tools hadoop-distcp
+  hadoop_add_to_classpath_tools hadoop-federation-balance
+}
+
+fi
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
new file mode 100644
index 0000000..ec565c3
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure.Stage;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure.RetryException;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.net.URI;
+import java.util.Random;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test DistCpProcedure.
+ */
+public class TestDistCpProcedure {
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  static final String MOUNT = "mock_mount_point";
+  private static final String SRCDAT = "srcdat";
+  private static final String DSTDAT = "dstdat";
+  private static final long BLOCK_SIZE = 1024;
+  private static final long FILE_SIZE = BLOCK_SIZE * 100;
+  private FileEntry[] srcfiles =
+      {new FileEntry(SRCDAT, true), new FileEntry(SRCDAT + "/a", false),
+          new FileEntry(SRCDAT + "/b", true),
+          new FileEntry(SRCDAT + "/b/c", false)};
+  private static String nnUri;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    DistCpProcedure.enabledForTest = true;
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+
+    String workPath =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
+    conf.set(SCHEDULER_JOURNAL_URI, workPath);
+
+    nnUri = FileSystem.getDefaultUri(conf).toString();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    DistCpProcedure.enabledForTest = false;
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testSuccessfulDistCpProcedure() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+    FsPermission originalPerm = new FsPermission(777);
+    fs.setPermission(src, originalPerm);
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
+    scheduler.init(true);
+
+    BalanceJob balanceJob =
+        new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
+    scheduler.submit(balanceJob);
+    scheduler.waitUntilDone(balanceJob);
+    assertTrue(balanceJob.isJobDone());
+    if (balanceJob.getError() != null) {
+      throw balanceJob.getError();
+    }
+    assertNull(balanceJob.getError());
+    assertTrue(fs.exists(dst));
+    assertFalse(
+        fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+    assertFalse(
+        fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+    assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
+    assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
+    for (FileEntry e : srcfiles) { // verify file len.
+      if (!e.isDir) {
+        Path targetFile = new Path(testRoot, e.path.replace(SRCDAT, DSTDAT));
+        assertEquals(FILE_SIZE, fs.getFileStatus(targetFile).getLen());
+      }
+    }
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testInitDistCp() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+    // set permission.
+    fs.setPermission(src, FsPermission.createImmutable((short) 020));
+
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+
+    // submit distcp.
+    try {
+      dcProcedure.initDistCp();
+    } catch (RetryException e) {
+    }
+    fs.delete(new Path(src, "a"), true);
+    // wait until job done.
+    executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+        () -> dcProcedure.initDistCp());
+    assertTrue(fs.exists(dst));
+    // Because we used snapshot, the file should be copied.
+    assertTrue(fs.exists(new Path(dst, "a")));
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testDiffDistCp() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+        () -> dcProcedure.initDistCp());
+    assertTrue(fs.exists(dst));
+
+    // move file out of src and test distcp.
+    fs.rename(new Path(src, "a"), new Path("/a"));
+    executeProcedure(dcProcedure, Stage.FINISH,
+        () -> dcProcedure.finalDistCp());
+    assertFalse(fs.exists(new Path(dst, "a")));
+    // move back file src/a and test distcp.
+    fs.rename(new Path("/a"), new Path(src, "a"));
+    executeProcedure(dcProcedure, Stage.FINISH,
+        () -> dcProcedure.finalDistCp());
+    assertTrue(fs.exists(new Path(dst, "a")));
+    // append file src/a and test.
+    OutputStream out = fs.append(new Path(src, "a"));
+    out.write("hello".getBytes());
+    out.close();
+    long len = fs.getFileStatus(new Path(src, "a")).getLen();
+    executeProcedure(dcProcedure, Stage.FINISH,
+        () -> dcProcedure.finalDistCp());
+    assertEquals(len, fs.getFileStatus(new Path(dst, "a")).getLen());
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testStageFinalDistCp() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+    // open files.
+    OutputStream out = fs.append(new Path(src, "a"));
+
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+        () -> dcProcedure.initDistCp());
+    executeProcedure(dcProcedure, Stage.FINISH,
+        () -> dcProcedure.finalDistCp());
+    // Verify all the open files have been closed.
+    intercept(RemoteException.class, "LeaseExpiredException",
+        "Expect RemoteException(LeaseExpiredException).", () -> out.close());
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testStageFinish() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+    fs.mkdirs(src);
+    fs.mkdirs(dst);
+    fs.allowSnapshot(src);
+    fs.allowSnapshot(dst);
+    fs.createSnapshot(src, LAST_SNAPSHOT_NAME);
+    fs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
+    fs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
+    FsPermission originalPerm = new FsPermission(777);
+    fs.setPermission(src, originalPerm);
+
+    // Test the finish stage.
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    dcProcedure.disableWrite();
+    dcProcedure.finish();
+
+    // Verify path and permission.
+    assertTrue(fs.exists(dst));
+    assertFalse(fs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR)));
+    assertFalse(fs.exists(new Path(dst, HdfsConstants.DOT_SNAPSHOT_DIR)));
+    assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
+    assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testRecoveryByStage() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    final DistCpProcedure[] dcp = new DistCpProcedure[1];
+    dcp[0] = new DistCpProcedure("distcp-procedure", null, 1000, context);
+
+    // Doing serialization and deserialization before each stage to monitor the
+    // recovery.
+    dcp[0] = serializeProcedure(dcp[0]);
+    executeProcedure(dcp[0], Stage.INIT_DISTCP, () -> dcp[0].preCheck());
+    dcp[0] = serializeProcedure(dcp[0]);
+    executeProcedure(dcp[0], Stage.DIFF_DISTCP, () -> dcp[0].initDistCp());
+    fs.delete(new Path(src, "a"), true); // make some difference.
+    dcp[0] = serializeProcedure(dcp[0]);
+    executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
+    dcp[0] = serializeProcedure(dcp[0]);
+    executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite());
+    dcp[0] = serializeProcedure(dcp[0]);
+    OutputStream out = fs.append(new Path(src, "b/c"));
+    executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp());
+    intercept(RemoteException.class, "LeaseExpiredException",
+        "Expect RemoteException(LeaseExpiredException).", () -> out.close());
+    dcp[0] = serializeProcedure(dcp[0]);
+    assertTrue(dcp[0].execute());
+    assertTrue(fs.exists(dst));
+    assertFalse(
+        fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+    assertFalse(
+        fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testShutdown() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
+    scheduler.init(true);
+
+    BalanceJob balanceJob =
+        new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
+    scheduler.submit(balanceJob);
+
+    long sleep = Math.abs(new Random().nextLong()) % 10000;
+    Thread.sleep(sleep);
+    scheduler.shutDown();
+    cleanup(fs, new Path(testRoot));
+  }
+
+  @Test(timeout = 30000)
+  public void testDisableWrite() throws Exception {
+    String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    createFiles(fs, testRoot, srcfiles);
+    Path src = new Path(testRoot, SRCDAT);
+    Path dst = new Path(testRoot, DSTDAT);
+
+    FedBalanceContext context = buildContext(src, dst, MOUNT);
+    DistCpProcedure dcProcedure =
+        new DistCpProcedure("distcp-procedure", null, 1000, context);
+    assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort());
+    executeProcedure(dcProcedure, Stage.FINAL_DISTCP,
+        () -> dcProcedure.disableWrite());
+    assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
+    cleanup(fs, new Path(testRoot));
+  }
+
+  private FedBalanceContext buildContext(Path src, Path dst, String mount) {
+    return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10)
+        .setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000)
+        .build();
+  }
+
+  interface Call {
+    void execute() throws IOException, RetryException;
+  }
+
+  /**
+   * Execute the procedure until its stage is updated to the target stage.
+   *
+   * @param procedure the procedure to be executed and verified.
+   * @param target the target stage.
+   * @param call the function executing the procedure.
+   */
+  private static void executeProcedure(DistCpProcedure procedure, Stage target,
+      Call call) throws IOException {
+    Stage stage = Stage.PRE_CHECK;
+    procedure.updateStage(stage);
+    while (stage != target) {
+      try {
+        call.execute();
+      } catch (RetryException e) {
+      } finally {
+        stage = procedure.getStage();
+      }
+    }
+  }
+
+  static class FileEntry {
+    private String path;
+    private boolean isDir;
+
+    FileEntry(String path, boolean isDir) {
+      this.path = path;
+      this.isDir = isDir;
+    }
+
+    String getPath() {
+      return path;
+    }
+
+    boolean isDirectory() {
+      return isDir;
+    }
+  }
+
+  /**
+   * Create directories and files with random data.
+   *
+   * @param fs the file system obj.
+   * @param topdir the base dir of the directories and files.
+   * @param entries the directory and file entries to be created.
+   */
+  private void createFiles(DistributedFileSystem fs, String topdir,
+      FileEntry[] entries) throws IOException {
+    long seed = System.currentTimeMillis();
+    Random rand = new Random(seed);
+    short replicationFactor = 2;
+    for (FileEntry entry : entries) {
+      Path newPath = new Path(topdir + "/" + entry.getPath());
+      if (entry.isDirectory()) {
+        fs.mkdirs(newPath);
+      } else {
+        int bufSize = 128;
+        DFSTestUtil.createFile(fs, newPath, bufSize, FILE_SIZE, BLOCK_SIZE,
+            replicationFactor, seed);
+      }
+      seed = System.currentTimeMillis() + rand.nextLong();
+    }
+  }
+
+  private DistCpProcedure serializeProcedure(DistCpProcedure dcp)
+      throws IOException {
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    DataOutput dataOut = new DataOutputStream(bao);
+    dcp.write(dataOut);
+    dcp = new DistCpProcedure();
+    dcp.readFields(
+        new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
+    return dcp;
+  }
+
+  private void cleanup(DistributedFileSystem dfs, Path root)
+      throws IOException {
+    Path src = new Path(root, SRCDAT);
+    Path dst = new Path(root, DSTDAT);
+    DistCpProcedure.cleanupSnapshot(dfs, src);
+    DistCpProcedure.cleanupSnapshot(dfs, dst);
+    dfs.delete(root, true);
+  }
+}
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java
new file mode 100644
index 0000000..9dd4e5d
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Basic tests of MountTableProcedure.
+ */
+public class TestMountTableProcedure {
+
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  private static Configuration routerConf;
+  private static List<MountTable> mockMountTable;
+  private static StateStoreService stateStore;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new StateStoreDFSCluster(false, 1);
+    // Build and start a router with State Store + admin + RPC
+    Configuration conf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    cluster.addRouterOverrides(conf);
+    cluster.startRouters();
+    routerContext = cluster.getRandomRouter();
+    mockMountTable = cluster.generateMockMountTable();
+    Router router = routerContext.getRouter();
+    stateStore = router.getStateStore();
+
+    // Add two name services for testing
+    ActiveNamenodeResolver membership = router.getNamenodeResolver();
+    membership.registerNamenode(createNamenodeReport("ns0", "nn1",
+        HAServiceProtocol.HAServiceState.ACTIVE));
+    membership.registerNamenode(createNamenodeReport("ns1", "nn1",
+        HAServiceProtocol.HAServiceState.ACTIVE));
+    stateStore.refreshCaches(true);
+
+    routerConf = new Configuration();
+    InetSocketAddress routerSocket = router.getAdminServerAddress();
+    routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        routerSocket);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.stopRouter(routerContext);
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+    assertTrue(
+        synchronizeRecords(stateStore, mockMountTable, MountTable.class));
+    // Avoid running with random users
+    routerContext.resetAdminClient();
+  }
+
+  @Test
+  public void testUpdateMountpoint() throws Exception {
+    // Firstly add mount entry: /test-path->{ns0,/test-path}.
+    String mount = "/test-path";
+    String dst = "/test-dst";
+    MountTable newEntry = MountTable
+        .newInstance(mount, Collections.singletonMap("ns0", mount),
+            Time.now(), Time.now());
+    MountTableManager mountTable =
+        routerContext.getAdminClient().getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+    // verify the mount entry is added successfully.
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance("/");
+    stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
+    GetMountTableEntriesResponse response =
+        mountTable.getMountTableEntries(request);
+    assertEquals(3, response.getEntries().size());
+
+    // set the mount table to readonly.
+    MountTableProcedure.disableWrite(mount, routerConf);
+
+    // test MountTableProcedure updates the mount point.
+    String dstNs = "ns1";
+    MountTableProcedure smtp =
+        new MountTableProcedure("single-mount-table-procedure", null,
+            1000, mount, dst, dstNs, routerConf);
+    assertTrue(smtp.execute());
+    stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
+    // verify the mount entry is updated to /
+    MountTable entry =
+        MountTableProcedure.getMountEntry(mount, mountTable);
+    assertNotNull(entry);
+    assertEquals(1, entry.getDestinations().size());
+    String nsId = entry.getDestinations().get(0).getNameserviceId();
+    String dstPath = entry.getDestinations().get(0).getDest();
+    assertEquals(dstNs, nsId);
+    assertEquals(dst, dstPath);
+    // Verify the mount table is not readonly.
+    URI address = routerContext.getFileSystemURI();
+    DFSClient routerClient = new DFSClient(address, routerConf);
+    MountTableProcedure.enableWrite(mount, routerConf);
+    intercept(RemoteException.class, "No namenode available to invoke mkdirs",
+        "Expect no namenode exception.", () -> routerClient
+            .mkdirs(mount + "/file", new FsPermission(020), false));
+  }
+
+  @Test
+  public void testDisableAndEnableWrite() throws Exception {
+    // Firstly add mount entry: /test-write->{ns0,/test-write}.
+    String mount = "/test-write";
+    MountTable newEntry = MountTable
+        .newInstance(mount, Collections.singletonMap("ns0", mount),
+            Time.now(), Time.now());
+    MountTableManager mountTable =
+        routerContext.getAdminClient().getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+    stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
+
+    // Construct client.
+    URI address = routerContext.getFileSystemURI();
+    DFSClient routerClient = new DFSClient(address, routerConf);
+    // Verify the mount point is not readonly.
+    intercept(RemoteException.class, "No namenode available to invoke mkdirs",
+        "Expect no namenode exception.", () -> routerClient
+            .mkdirs(mount + "/file", new FsPermission(020), false));
+
+    // Verify disable write.
+    MountTableProcedure.disableWrite(mount, routerConf);
+    intercept(RemoteException.class, "is in a read only mount point",
+        "Expect readonly exception.", () -> routerClient
+            .mkdirs(mount + "/dir", new FsPermission(020), false));
+
+    // Verify enable write.
+    MountTableProcedure.enableWrite(mount, routerConf);
+    intercept(RemoteException.class, "No namenode available to invoke mkdirs",
+        "Expect no namenode exception.", () -> routerClient
+            .mkdirs(mount + "/file", new FsPermission(020), false));
+  }
+
+  @Test
+  public void testSeDeserialize() throws Exception {
+    String fedPath = "/test-path";
+    String dst = "/test-dst";
+    String dstNs = "ns1";
+    MountTableProcedure smtp =
+        new MountTableProcedure("single-mount-table-procedure", null,
+            1000, fedPath, dst, dstNs, routerConf);
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    DataOutput dataOut = new DataOutputStream(bao);
+    smtp.write(dataOut);
+    smtp = new MountTableProcedure();
+    smtp.readFields(
+        new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
+    assertEquals(fedPath, smtp.getMount());
+    assertEquals(dst, smtp.getDstPath());
+    assertEquals(dstNs, smtp.getDstNs());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java
new file mode 100644
index 0000000..a128932
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test TrashProcedure.
+ */
+public class TestTrashProcedure {
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster;
+  private static String nnUri;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    nnUri = FileSystem.getDefaultUri(conf).toString();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testTrashProcedure() throws Exception {
+    Path src = new Path("/" + getMethodName() + "-src");
+    Path dst = new Path("/" + getMethodName() + "-dst");
+    FileSystem fs = cluster.getFileSystem();
+    fs.mkdirs(src);
+    fs.mkdirs(new Path(src, "dir"));
+    assertTrue(fs.exists(src));
+
+    FedBalanceContext context =
+        new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf)
+            .setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH)
+            .build();
+    TrashProcedure trashProcedure =
+        new TrashProcedure("trash-procedure", null, 1000, context);
+    trashProcedure.moveToTrash();
+    assertFalse(fs.exists(src));
+  }
+
+  @Test
+  public void testSeDeserialize() throws Exception {
+    Path src = new Path("/" + getMethodName() + "-src");
+    Path dst = new Path("/" + getMethodName() + "-dst");
+    FedBalanceContext context =
+        new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf)
+            .setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH)
+            .build();
+    TrashProcedure trashProcedure =
+        new TrashProcedure("trash-procedure", null, 1000, context);
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    DataOutput dataOut = new DataOutputStream(bao);
+    trashProcedure.write(dataOut);
+    trashProcedure = new TrashProcedure();
+    trashProcedure.readFields(
+        new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
+    assertEquals(context, trashProcedure.getContext());
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java
index 27cfebd..b9c9c1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java
index 706d4a1..9754b09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java
index 336873e..faec834 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import java.io.DataInput;
 import java.io.DataOutput;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java
similarity index 98%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java
index 39e000b..7a2b449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -43,8 +43,8 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertEquals;
@@ -70,6 +70,7 @@ public class TestBalanceProcedureScheduler {
     CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///");
     CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
     CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    CONF.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
     CONF.setInt(WORK_THREAD_NUM, 1);
 
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java
index 941d0a0..804f1aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import java.io.IOException;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java
index 8666caf..af46b17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
 
 import org.apache.hadoop.util.Time;
 
diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml
index f923bb7..cc811fc 100644
--- a/hadoop-tools/hadoop-tools-dist/pom.xml
+++ b/hadoop-tools/hadoop-tools-dist/pom.xml
@@ -46,6 +46,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-federation-balance</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-archives</artifactId>
       <scope>compile</scope>
     </dependency>
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index eb0a31a..f026bc2 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -32,6 +32,7 @@
   <modules>
     <module>hadoop-streaming</module>
     <module>hadoop-distcp</module>
+    <module>hadoop-federation-balance</module>
     <module>hadoop-dynamometer</module>
     <module>hadoop-archives</module>
     <module>hadoop-archive-logs</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org