You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2020/06/18 05:34:08 UTC
[hadoop] branch trunk updated: HDFS-15346. FedBalance tool
implementation. Contributed by Jinglun.
This is an automated email from the ASF dual-hosted git repository.
yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9cbd76c HDFS-15346. FedBalance tool implementation. Contributed by Jinglun.
9cbd76c is described below
commit 9cbd76cc775b58dfedb943f971b3307ec5702f13
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Thu Jun 18 13:33:25 2020 +0800
HDFS-15346. FedBalance tool implementation. Contributed by Jinglun.
---
.../src/main/resources/assemblies/hadoop-tools.xml | 15 +
.../apache/hadoop/hdfs/protocol/HdfsConstants.java | 2 +
hadoop-project/pom.xml | 17 +
hadoop-tools/hadoop-federation-balance/pom.xml | 249 ++++++++
.../tools/fedbalance/DistCpBalanceOptions.java | 95 +++
.../hadoop/tools/fedbalance/DistCpProcedure.java | 635 +++++++++++++++++++++
.../apache/hadoop/tools/fedbalance/FedBalance.java | 377 ++++++++++++
.../hadoop/tools/fedbalance/FedBalanceConfigs.java | 19 +-
.../hadoop/tools/fedbalance/FedBalanceContext.java | 286 ++++++++++
.../tools/fedbalance/MountTableProcedure.java | 244 ++++++++
.../hadoop/tools/fedbalance/TrashProcedure.java | 112 ++++
.../hadoop/tools/fedbalance}/package-info.java | 14 +-
.../tools/fedbalance}/procedure/BalanceJob.java | 2 +-
.../fedbalance}/procedure/BalanceJournal.java | 2 +-
.../procedure/BalanceJournalInfoHDFS.java | 8 +-
.../fedbalance}/procedure/BalanceProcedure.java | 4 +-
.../procedure/BalanceProcedureScheduler.java | 8 +-
.../tools/fedbalance}/procedure/package-info.java | 2 +-
.../shellprofile.d/hadoop-federation-balance.sh | 38 ++
.../tools/fedbalance/TestDistCpProcedure.java | 446 +++++++++++++++
.../tools/fedbalance/TestMountTableProcedure.java | 222 +++++++
.../tools/fedbalance/TestTrashProcedure.java | 102 ++++
.../fedbalance}/procedure/MultiPhaseProcedure.java | 2 +-
.../fedbalance}/procedure/RecordProcedure.java | 2 +-
.../fedbalance}/procedure/RetryProcedure.java | 2 +-
.../procedure/TestBalanceProcedureScheduler.java | 7 +-
.../procedure/UnrecoverableProcedure.java | 2 +-
.../tools/fedbalance}/procedure/WaitProcedure.java | 2 +-
hadoop-tools/hadoop-tools-dist/pom.xml | 5 +
hadoop-tools/pom.xml | 1 +
30 files changed, 2887 insertions(+), 35 deletions(-)
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
index 054d8c0..db744f5 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
@@ -48,6 +48,14 @@
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
+ <directory>../hadoop-federation-balance/src/main/shellprofile.d</directory>
+ <includes>
+ <include>*</include>
+ </includes>
+ <outputDirectory>/libexec/shellprofile.d</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
<directory>../hadoop-extras/src/main/shellprofile.d</directory>
<includes>
<include>*</include>
@@ -112,6 +120,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>../hadoop-federation-balance/target</directory>
+ <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
+ <includes>
+ <include>*-sources.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>../hadoop-extras/target</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
<includes>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index ab61e50..a025b9b 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -105,6 +105,8 @@ public final class HdfsConstants {
public static final String DOT_SNAPSHOT_DIR = ".snapshot";
public static final String SEPARATOR_DOT_SNAPSHOT_DIR
= Path.SEPARATOR + DOT_SNAPSHOT_DIR;
+ public static final String DOT_SNAPSHOT_DIR_SEPARATOR =
+ DOT_SNAPSHOT_DIR + Path.SEPARATOR;
public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
= Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR;
public final static String DOT_RESERVED_STRING = ".reserved";
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index f3a3d76..48928b5 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -329,6 +329,12 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-rbf</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>${hadoop.version}</version>
</dependency>
@@ -580,6 +586,17 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-federation-balance</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-federation-balance</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-datajoin</artifactId>
<version>${hadoop.version}</version>
</dependency>
diff --git a/hadoop-tools/hadoop-federation-balance/pom.xml b/hadoop-tools/hadoop-federation-balance/pom.xml
new file mode 100644
index 0000000..cf79e17
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project</artifactId>
+ <version>3.4.0-SNAPSHOT</version>
+ <relativePath>../../hadoop-project</relativePath>
+ </parent>
+ <artifactId>hadoop-federation-balance</artifactId>
+ <version>3.4.0-SNAPSHOT</version>
+ <description>Apache Hadoop Federation Balance</description>
+ <name>Apache Hadoop Federation Balance</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <file.encoding>UTF-8</file.encoding>
+ <downloadSources>true</downloadSources>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-rbf</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-rbf</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ <filtering>true</filtering>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <testFailureIgnore>${ignoreTestFailure}</testFailureIgnore>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+ <argLine>-Xmx1024m</argLine>
+ <includes>
+ <include>**/Test*.java</include>
+ </includes>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <systemProperties>
+ <property>
+ <name>test.build.data</name>
+ <value>${basedir}/target/test/data</value>
+ </property>
+ <property>
+ <name>hadoop.log.dir</name>
+ <value>target/test/logs</value>
+ </property>
+ <property>
+ <name>org.apache.commons.logging.Log</name>
+ <value>org.apache.commons.logging.impl.SimpleLog</value>
+ </property>
+ <property>
+ <name>org.apache.commons.logging.simplelog.defaultlog</name>
+ <value>warn</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <id>deplist</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>list</goal>
+ </goals>
+ <configuration>
+ <!-- referenced by a built-in command -->
+ <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt</outputFile>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.hadoop.tools.fedbalance.FedBalance</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>prepare-jar</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <configuration>
+ <attach>true</attach>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java
new file mode 100644
index 0000000..704ffd9
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+/**
+ * Command line options of FedBalance.
+ */
+public final class DistCpBalanceOptions {
+
+ /**
+ * The private construct protects this class from being instantiated.
+ */
+ private DistCpBalanceOptions() {}
+
+ /**
+ * Run in router-based federation mode.
+ */
+ final static Option ROUTER = new Option("router", false,
+ "If `true` the command runs in router mode. The source path is "
+ + "taken as a mount point. It will disable write by setting the mount"
+ + " point readonly. Otherwise the command works in normal federation"
+ + " mode. The source path is taken as the full path. It will disable"
+ + " write by cancelling all permissions of the source path. The"
+ + " default value is `true`.");
+
+ /**
+ * If true, in DIFF_DISTCP stage it will force close all open files when
+ * there is no diff between the source path and the dst path. Otherwise
+ * the DIFF_DISTCP stage will wait until there is no open files. The
+ * default value is `false`.
+ */
+ final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false,
+ "Force close all open files if the src and dst are synced.");
+
+ /**
+ * Max number of maps to use during copy. DistCp will split work as equally
+ * as possible among these maps.
+ */
+ final static Option MAP =
+ new Option("map", true, "Max number of concurrent maps to use for copy");
+
+ /**
+ * Specify bandwidth per map in MB, accepts bandwidth as a fraction.
+ */
+ final static Option BANDWIDTH =
+ new Option("bandwidth", true, "Specify bandwidth per map in MB.");
+
+ /**
+ * Specify the delayed duration(millie seconds) to retry the Job.
+ */
+ final static Option DELAY_DURATION = new Option("delay", true,
+ "This specifies the delayed duration(millie seconds) when the job"
+ + " needs to retry. A job may retry many times and check the state"
+ + " when it waits for the distcp job to finish.");
+
+ /**
+ * Move the source path to trash after all the data are sync to target, or
+ * delete the source directly, or skip both trash and deletion.
+ */
+ final static Option TRASH = new Option("moveToTrash", true,
+ "Move the source path to trash, or delete the source path directly,"
+ + " or skip both trash and deletion. This accepts 3 values: trash,"
+ + " delete and skip. By default the server side trash interval is"
+ + " used. If the trash is disabled in the server side, the default"
+ + " trash interval 60 minutes is used.");
+
+ final static Options CLI_OPTIONS = new Options();
+
+ static {
+ CLI_OPTIONS.addOption(ROUTER);
+ CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN);
+ CLI_OPTIONS.addOption(MAP);
+ CLI_OPTIONS.addOption(BANDWIDTH);
+ CLI_OPTIONS.addOption(DELAY_DURATION);
+ CLI_OPTIONS.addOption(TRASH);
+ }
+}
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
new file mode 100644
index 0000000..73fecbf
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.OptionsParser;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
+
+/**
+ * Copy data through distcp. Super user privilege needed.
+ *
+ * PRE_CHECK :pre-check of src and dst.
+ * INIT_DISTCP :the first round of distcp.
+ * DIFF_DISTCP :copy snapshot diff round by round until there is
+ * no diff.
+ * DISABLE_WRITE :disable write operations.
+ * FINAL_DISTCP :close all open files and do the final round distcp.
+ * FINISH :procedure finish.
+ */
+public class DistCpProcedure extends BalanceProcedure {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DistCpProcedure.class);
+
+ /* Stages of this procedure. */
+ enum Stage {
+ PRE_CHECK, INIT_DISTCP, DIFF_DISTCP, DISABLE_WRITE, FINAL_DISTCP, FINISH
+ }
+
+ private FedBalanceContext context; // the balance context.
+ private Path src; // the source path including the source cluster.
+ private Path dst; // the dst path including the dst cluster.
+ private Configuration conf;
+ private int mapNum; // the number of map tasks.
+ private int bandWidth; // the bandwidth limit of each distcp task.
+ private String jobId; // the id of the current distcp.
+ private Stage stage; // current stage of this procedure.
+
+ /* Force close all open files when there is no diff between src and dst */
+ private boolean forceCloseOpenFiles;
+ /* Disable write by setting the mount point readonly. */
+ private boolean useMountReadOnly;
+
+ private FsPermission fPerm; // the permission of the src.
+ private AclStatus acl; // the acl of the src.
+
+ private JobClient client;
+ private DistributedFileSystem srcFs; // fs of the src cluster.
+ private DistributedFileSystem dstFs; // fs of the dst cluster.
+
+ /**
+ * Test only. In unit test we use the LocalJobRunner to run the distcp jobs.
+ * Here we save the job to look up the job status. The localJob won't be
+ * serialized thus won't be recovered.
+ */
+ @VisibleForTesting
+ private Job localJob;
+ /**
+ * Enable test mode. Use LocalJobRunner to run the distcp jobs.
+ */
+ @VisibleForTesting
+ static boolean enabledForTest = false;
+
+ public DistCpProcedure() {
+ }
+
+ /**
+ * The constructor of DistCpProcedure.
+ *
+ * @param name the name of the procedure.
+ * @param nextProcedure the name of the next procedure.
+ * @param delayDuration the delay duration when this procedure is delayed.
+ * @param context the federation balance context.
+ */
+ public DistCpProcedure(String name, String nextProcedure, long delayDuration,
+ FedBalanceContext context) throws IOException {
+ super(name, nextProcedure, delayDuration);
+ this.context = context;
+ this.src = context.getSrc();
+ this.dst = context.getDst();
+ this.conf = context.getConf();
+ this.client = new JobClient(conf);
+ this.stage = Stage.PRE_CHECK;
+ this.mapNum = context.getMapNum();
+ this.bandWidth = context.getBandwidthLimit();
+ this.forceCloseOpenFiles = context.getForceCloseOpenFiles();
+ this.useMountReadOnly = context.getUseMountReadOnly();
+ srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+ dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
+ }
+
+ @Override
+ public boolean execute() throws RetryException, IOException {
+ LOG.info("Stage={}", stage.name());
+ switch (stage) {
+ case PRE_CHECK:
+ preCheck();
+ return false;
+ case INIT_DISTCP:
+ initDistCp();
+ return false;
+ case DIFF_DISTCP:
+ diffDistCp();
+ return false;
+ case DISABLE_WRITE:
+ disableWrite();
+ return false;
+ case FINAL_DISTCP:
+ finalDistCp();
+ return false;
+ case FINISH:
+ finish();
+ return true;
+ default:
+ throw new IOException("Unexpected stage=" + stage);
+ }
+ }
+
+ /**
+ * Pre check of src and dst.
+ */
+ void preCheck() throws IOException {
+ FileStatus status = srcFs.getFileStatus(src);
+ if (!status.isDirectory()) {
+ throw new IOException(src + " should be a directory.");
+ }
+ if (dstFs.exists(dst)) {
+ throw new IOException(dst + " already exists.");
+ }
+ if (srcFs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR))) {
+ throw new IOException(src + " shouldn't enable snapshot.");
+ }
+ updateStage(Stage.INIT_DISTCP);
+ }
+
+ /**
+ * The initial distcp. Copying src to dst.
+ */
+ void initDistCp() throws IOException, RetryException {
+ RunningJobStatus job = getCurrentJob();
+ if (job != null) {
+ // the distcp has been submitted.
+ if (job.isComplete()) {
+ jobId = null; // unset jobId because the job is done.
+ if (job.isSuccessful()) {
+ updateStage(Stage.DIFF_DISTCP);
+ return;
+ } else {
+ LOG.warn("DistCp failed. Failure={}", job.getFailureInfo());
+ }
+ } else {
+ throw new RetryException();
+ }
+ } else {
+ pathCheckBeforeInitDistcp();
+ srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
+ jobId = submitDistCpJob(
+ src.toString() + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
+ + CURRENT_SNAPSHOT_NAME, dst.toString(), false);
+ }
+ }
+
+ /**
+ * The distcp copying diffs between LAST_SNAPSHOT_NAME and
+ * CURRENT_SNAPSHOT_NAME.
+ */
+ void diffDistCp() throws IOException, RetryException {
+ RunningJobStatus job = getCurrentJob();
+ if (job != null) {
+ if (job.isComplete()) {
+ jobId = null;
+ if (job.isSuccessful()) {
+ LOG.info("DistCp succeeded. jobId={}", job.getJobID());
+ } else {
+ throw new IOException("DistCp failed. jobId=" + job.getJobID()
+ + " failure=" + job.getFailureInfo());
+ }
+ } else {
+ throw new RetryException(); // wait job complete.
+ }
+ } else if (!verifyDiff()) {
+ if (!verifyOpenFiles() || forceCloseOpenFiles) {
+ updateStage(Stage.DISABLE_WRITE);
+ } else {
+ throw new RetryException();
+ }
+ } else {
+ submitDiffDistCp();
+ }
+ }
+
+ /**
+ * Disable write either by making the mount entry readonly or cancelling the
+ * execute permission of the source path.
+ */
+ void disableWrite() throws IOException {
+ if (useMountReadOnly) {
+ String mount = context.getMount();
+ MountTableProcedure.disableWrite(mount, conf);
+ } else {
+ // Save and cancel permission.
+ FileStatus status = srcFs.getFileStatus(src);
+ fPerm = status.getPermission();
+ acl = srcFs.getAclStatus(src);
+ srcFs.setPermission(src, FsPermission.createImmutable((short) 0));
+ }
+ updateStage(Stage.FINAL_DISTCP);
+ }
+
+ /**
+ * Enable write by restoring the x permission.
+ */
+ void restorePermission() throws IOException {
+ // restore permission.
+ dstFs.removeAcl(dst);
+ if (acl != null) {
+ dstFs.modifyAclEntries(dst, acl.getEntries());
+ }
+ if (fPerm != null) {
+ dstFs.setPermission(dst, fPerm);
+ }
+ }
+
+ /**
+ * Close all open files then submit the distcp with -diff.
+ */
+ void finalDistCp() throws IOException, RetryException {
+ // Close all open files then do the final distcp.
+ closeAllOpenFiles(srcFs, src);
+ // Final distcp.
+ RunningJobStatus job = getCurrentJob();
+ if (job != null) {
+ // the distcp has been submitted.
+ if (job.isComplete()) {
+ jobId = null; // unset jobId because the job is done.
+ if (job.isSuccessful()) {
+ updateStage(Stage.FINISH);
+ return;
+ } else {
+ throw new IOException(
+ "Final DistCp failed. Failure: " + job.getFailureInfo());
+ }
+ } else {
+ throw new RetryException();
+ }
+ } else {
+ submitDiffDistCp();
+ }
+ }
+
+ void finish() throws IOException {
+ if (!useMountReadOnly) {
+ restorePermission();
+ }
+ if (srcFs.exists(src)) {
+ cleanupSnapshot(srcFs, src);
+ }
+ if (dstFs.exists(dst)) {
+ cleanupSnapshot(dstFs, dst);
+ }
+ }
+
+ @VisibleForTesting
+ Stage getStage() {
+ return stage;
+ }
+
+ @VisibleForTesting
+ void updateStage(Stage value) {
+ String oldStage = stage == null ? "null" : stage.name();
+ String newStage = value == null ? "null" : value.name();
+ LOG.info("Stage updated from {} to {}.", oldStage, newStage);
+ stage = value;
+ }
+
+ /**
+ * Submit distcp with -diff option to do the incremental copy.
+ *
+ * | the source path | the dst path |
+ * | LAST_SNAPSHOT_NAME | LAST_SNAPSHOT_NAME |
+ * | CURRENT_SNAPSHOT_NAME |
+ *
+ * 1. Cleanup all the last snapshots. If there are no last snapshots then do
+ * nothing.
+ * 2. Create the dst path snapshot named the last snapshot.
+ * 3. Rename the source path current snapshot as the last snapshot. The dst
+ * path last snapshot and the source path last snapshot are the same now.
+ * 4. Create the current snapshot of the source path.
+ * 5. Submit the distcp job. The incremental part is from the source path last
+ * snapshot to the source path current snapshot.
+ */
+ private void submitDiffDistCp() throws IOException {
+ enableSnapshot(dstFs, dst);
+ deleteSnapshot(srcFs, src, LAST_SNAPSHOT_NAME);
+ deleteSnapshot(dstFs, dst, LAST_SNAPSHOT_NAME);
+ dstFs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
+ srcFs.renameSnapshot(src, CURRENT_SNAPSHOT_NAME, LAST_SNAPSHOT_NAME);
+ srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
+ jobId = submitDistCpJob(src.toString(), dst.toString(), true);
+ }
+
+ /**
+ * Close all open files. Block until all the files are closed.
+ */
+ private void closeAllOpenFiles(DistributedFileSystem dfs, Path path)
+ throws IOException {
+ String pathStr = path.toUri().getPath();
+ while (true) {
+ RemoteIterator<OpenFileEntry> iterator =
+ dfs.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), pathStr);
+ if (!iterator.hasNext()) { // all files has been closed.
+ break;
+ }
+ while (iterator.hasNext()) {
+ OpenFileEntry e = iterator.next();
+ try {
+ srcFs.recoverLease(new Path(e.getFilePath()));
+ } catch (IOException re) {
+ // ignore recoverLease error.
+ }
+ }
+ }
+ }
+
+ /**
+ * Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot.
+ *
+ * @return true if the src has changed.
+ */
+ private boolean verifyDiff() throws IOException {
+ SnapshotDiffReport diffReport =
+ srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, "");
+ return diffReport.getDiffList().size() > 0;
+ }
+
+ /**
+ * Verify whether there is any open files under src.
+ *
+ * @return true if there are open files.
+ */
+ private boolean verifyOpenFiles() throws IOException {
+ RemoteIterator<OpenFileEntry> iterator = srcFs
+ .listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
+ src.toString());
+ return iterator.hasNext();
+ }
+
+ private RunningJobStatus getCurrentJob() throws IOException {
+ if (jobId != null) {
+ if (enabledForTest) {
+ return getCurrentLocalJob();
+ } else {
+ RunningJob latestJob = client.getJob(JobID.forName(jobId));
+ return latestJob == null ? null : new YarnRunningJobStatus(latestJob);
+ }
+ }
+ return null;
+ }
+
+ private LocalJobStatus getCurrentLocalJob() throws IOException {
+ if (localJob != null) {
+ Job latestJob;
+ try {
+ latestJob = localJob.getCluster().getJob(JobID.forName(jobId));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return latestJob == null ? null : new LocalJobStatus(latestJob);
+ } else {
+ return null;
+ }
+ }
+
+ private void pathCheckBeforeInitDistcp() throws IOException {
+ if (dstFs.exists(dst)) { // clean up.
+ throw new IOException("The dst path=" + dst + " already exists. The admin"
+ + " should delete it before submitting the initial distcp job.");
+ }
+ Path snapshotPath = new Path(src,
+ HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + CURRENT_SNAPSHOT_NAME);
+ if (srcFs.exists(snapshotPath)) {
+ throw new IOException("The src snapshot=" + snapshotPath +
+ " already exists. The admin should delete the snapshot before"
+ + " submitting the initial distcp.");
+ }
+ srcFs.allowSnapshot(src);
+ }
+
+ /**
+ * Submit distcp job and return jobId.
+ */
+ private String submitDistCpJob(String srcParam, String dstParam,
+ boolean useSnapshotDiff) throws IOException {
+ List<String> command = new ArrayList<>();
+ command.addAll(Arrays
+ .asList(new String[] {"-async", "-update", "-append", "-pruxgpcab"}));
+ if (useSnapshotDiff) {
+ command.add("-diff");
+ command.add(LAST_SNAPSHOT_NAME);
+ command.add(CURRENT_SNAPSHOT_NAME);
+ }
+ command.add("-m");
+ command.add(mapNum + "");
+ command.add("-bandwidth");
+ command.add(bandWidth + "");
+ command.add(srcParam);
+ command.add(dstParam);
+
+ Configuration config = new Configuration(conf);
+ DistCp distCp;
+ try {
+ distCp = new DistCp(config,
+ OptionsParser.parse(command.toArray(new String[]{})));
+ Job job = distCp.createAndSubmitJob();
+ LOG.info("Submit distcp job={}", job);
+ if (enabledForTest) {
+ localJob = job;
+ }
+ return job.getJobID().toString();
+ } catch (Exception e) {
+ throw new IOException("Submit job failed.", e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ context.write(out);
+ if (jobId == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ Text.writeString(out, jobId);
+ }
+ out.writeInt(stage.ordinal());
+ if (fPerm == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeShort(fPerm.toShort());
+ }
+ if (acl == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ PBHelperClient.convert(acl).writeDelimitedTo(bout);
+ byte[] data = bout.toByteArray();
+ out.writeInt(data.length);
+ out.write(data);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ context = new FedBalanceContext();
+ context.readFields(in);
+ src = context.getSrc();
+ dst = context.getDst();
+ conf = context.getConf();
+ if (in.readBoolean()) {
+ jobId = Text.readString(in);
+ }
+ stage = Stage.values()[in.readInt()];
+ if (in.readBoolean()) {
+ fPerm = FsPermission.read(in);
+ }
+ if (in.readBoolean()) {
+ int len = in.readInt();
+ byte[] data = new byte[len];
+ in.readFully(data);
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ AclProtos.GetAclStatusResponseProto proto =
+ AclProtos.GetAclStatusResponseProto.parseDelimitedFrom(bin);
+ acl = PBHelperClient.convert(proto);
+ }
+ srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+ dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf);
+ mapNum = context.getMapNum();
+ bandWidth = context.getBandwidthLimit();
+ forceCloseOpenFiles = context.getForceCloseOpenFiles();
+ useMountReadOnly = context.getUseMountReadOnly();
+ this.client = new JobClient(conf);
+ }
+
+ private static void enableSnapshot(DistributedFileSystem dfs, Path path)
+ throws IOException {
+ if (!dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) {
+ dfs.allowSnapshot(path);
+ }
+ }
+
+ static void deleteSnapshot(DistributedFileSystem dfs, Path path,
+ String snapshotName) throws IOException {
+ Path snapshot =
+ new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + snapshotName);
+ if (dfs.exists(snapshot)) {
+ dfs.deleteSnapshot(path, snapshotName);
+ }
+ }
+
+ static void cleanupSnapshot(DistributedFileSystem dfs, Path path)
+ throws IOException {
+ if (dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) {
+ FileStatus[] status =
+ dfs.listStatus(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR));
+ for (FileStatus s : status) {
+ deleteSnapshot(dfs, path, s.getPath().getName());
+ }
+ dfs.disallowSnapshot(path);
+ }
+ }
+
+ interface RunningJobStatus {
+ String getJobID();
+
+ boolean isComplete() throws IOException;
+
+ boolean isSuccessful() throws IOException;
+
+ String getFailureInfo() throws IOException;
+ }
+
+ private static class YarnRunningJobStatus implements RunningJobStatus {
+
+ private final RunningJob job;
+
+ YarnRunningJobStatus(RunningJob job) {
+ this.job = job;
+ }
+
+ @Override
+ public String getJobID() {
+ return job.getID().toString();
+ }
+
+ @Override
+ public boolean isComplete() throws IOException {
+ return job.isComplete();
+ }
+
+ @Override
+ public boolean isSuccessful() throws IOException {
+ return job.isSuccessful();
+ }
+
+ @Override
+ public String getFailureInfo() throws IOException {
+ return job.getFailureInfo();
+ }
+ }
+
+ private static class LocalJobStatus implements RunningJobStatus {
+
+ private final Job testJob;
+
+ LocalJobStatus(Job testJob) {
+ this.testJob = testJob;
+ }
+
+ @Override
+ public String getJobID() {
+ return testJob.getJobID().toString();
+ }
+
+ @Override
+ public boolean isComplete() throws IOException {
+ return testJob.isComplete();
+ }
+
+ @Override
+ public boolean isSuccessful() throws IOException {
+ return testJob.isSuccessful();
+ }
+
+ @Override
+ public String getFailureInfo() throws IOException {
+ try {
+ return testJob.getStatus().getFailureInfo();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
new file mode 100644
index 0000000..adfb40b
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.ROUTER;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.FORCE_CLOSE_OPEN;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.MAP;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.BANDWIDTH;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.TRASH;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.DELAY_DURATION;
+import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.CLI_OPTIONS;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.FEDERATION_BALANCE_CLASS;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+
+/**
+ * Balance data from src cluster to dst cluster with distcp.
+ *
+ * 1. Move data from the source path to the destination path with distcp.
+ * 2. Update the the mount entry.
+ * 3. Delete the source path to trash.
+ */
+public class FedBalance extends Configured implements Tool {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FedBalance.class);
+ private static final String SUBMIT_COMMAND = "submit";
+ private static final String CONTINUE_COMMAND = "continue";
+ private static final String NO_MOUNT = "no-mount";
+ private static final String DISTCP_PROCEDURE = "distcp-procedure";
+ private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure";
+ private static final String TRASH_PROCEDURE = "trash-procedure";
+
+ /**
+ * This class helps building the balance job.
+ */
+ private class Builder {
+ /* Balancing in an rbf cluster. */
+ private boolean routerCluster = false;
+ /* Force close all open files while there is no diff. */
+ private boolean forceCloseOpen = false;
+ /* Max number of concurrent maps to use for copy. */
+ private int map = 10;
+ /* Specify bandwidth per map in MB. */
+ private int bandwidth = 10;
+ /* Specify the trash behaviour of the source path. */
+ private TrashOption trashOpt = TrashOption.TRASH;
+ /* Specify the duration(millie seconds) when the procedure needs retry. */
+ private long delayDuration = TimeUnit.SECONDS.toMillis(1);
+ /* The source input. This specifies the source path. */
+ private final String inputSrc;
+ /* The dst input. This specifies the dst path. */
+ private final String inputDst;
+
+ Builder(String inputSrc, String inputDst) {
+ this.inputSrc = inputSrc;
+ this.inputDst = inputDst;
+ }
+
+ /**
+ * Whether balancing in an rbf cluster.
+ * @param value true if it's running in a router-based federation cluster.
+ */
+ public Builder setRouterCluster(boolean value) {
+ this.routerCluster = value;
+ return this;
+ }
+
+ /**
+ * Whether force close all open files while there is no diff.
+ * @param value true if force close all the open files.
+ */
+ public Builder setForceCloseOpen(boolean value) {
+ this.forceCloseOpen = value;
+ return this;
+ }
+
+ /**
+ * Max number of concurrent maps to use for copy.
+ * @param value the map number of the distcp.
+ */
+ public Builder setMap(int value) {
+ this.map = value;
+ return this;
+ }
+
+ /**
+ * Specify bandwidth per map in MB.
+ * @param value the bandwidth.
+ */
+ public Builder setBandWidth(int value) {
+ this.bandwidth = value;
+ return this;
+ }
+
+ /**
+ * Specify the trash behaviour of the source path.
+ * @param value the trash option.
+ */
+ public Builder setTrashOpt(TrashOption value) {
+ this.trashOpt = value;
+ return this;
+ }
+
+ /**
+ * Specify the duration(millie seconds) when the procedure needs retry.
+ * @param value the delay duration of the job.
+ */
+ public Builder setDelayDuration(long value) {
+ this.delayDuration = value;
+ return this;
+ }
+
+ /**
+ * Build the balance job.
+ */
+ public BalanceJob build() throws IOException {
+ // Construct job context.
+ FedBalanceContext context;
+ Path dst = new Path(inputDst);
+ if (dst.toUri().getAuthority() == null) {
+ throw new IOException("The destination cluster must be specified.");
+ }
+ if (routerCluster) { // router-based federation.
+ Path src = getSrcPath(inputSrc);
+ String mount = inputSrc;
+ context = new FedBalanceContext.Builder(src, dst, mount, getConf())
+ .setForceCloseOpenFiles(forceCloseOpen)
+ .setUseMountReadOnly(routerCluster).setMapNum(map)
+ .setBandwidthLimit(bandwidth).setTrash(trashOpt)
+ .setDelayDuration(delayDuration).build();
+ } else { // normal federation cluster.
+ Path src = new Path(inputSrc);
+ if (src.toUri().getAuthority() == null) {
+ throw new IOException("The source cluster must be specified.");
+ }
+ context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf())
+ .setForceCloseOpenFiles(forceCloseOpen)
+ .setUseMountReadOnly(routerCluster).setMapNum(map)
+ .setBandwidthLimit(bandwidth).setTrash(trashOpt).build();
+ }
+
+ LOG.info(context.toString());
+ // Construct the balance job.
+ BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
+ DistCpProcedure dcp =
+ new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context);
+ builder.nextProcedure(dcp);
+ if (routerCluster) {
+ MountTableProcedure mtp =
+ new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration,
+ inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(),
+ getConf());
+ builder.nextProcedure(mtp);
+ }
+ TrashProcedure tp =
+ new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context);
+ builder.nextProcedure(tp);
+ return builder.build();
+ }
+ }
+
+ public FedBalance() {
+ super();
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ CommandLineParser parser = new GnuParser();
+ CommandLine command =
+ parser.parse(DistCpBalanceOptions.CLI_OPTIONS, args, true);
+ String[] leftOverArgs = command.getArgs();
+ if (leftOverArgs == null || leftOverArgs.length < 1) {
+ printUsage();
+ return -1;
+ }
+ String cmd = leftOverArgs[0];
+ if (cmd.equals(SUBMIT_COMMAND)) {
+ if (leftOverArgs.length < 3) {
+ printUsage();
+ return -1;
+ }
+ String inputSrc = leftOverArgs[1];
+ String inputDst = leftOverArgs[2];
+ return submit(command, inputSrc, inputDst);
+ } else if (cmd.equals(CONTINUE_COMMAND)) {
+ return continueJob();
+ } else {
+ printUsage();
+ return -1;
+ }
+ }
+
+ /**
+ * Recover and continue the unfinished jobs.
+ */
+ private int continueJob() throws InterruptedException {
+ BalanceProcedureScheduler scheduler =
+ new BalanceProcedureScheduler(getConf());
+ try {
+ scheduler.init(true);
+ while (true) {
+ Collection<BalanceJob> jobs = scheduler.getAllJobs();
+ int unfinished = 0;
+ for (BalanceJob job : jobs) {
+ if (!job.isJobDone()) {
+ unfinished++;
+ }
+ LOG.info(job.toString());
+ }
+ if (unfinished == 0) {
+ break;
+ }
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ }
+ } catch (IOException e) {
+ LOG.error("Continue balance job failed.", e);
+ return -1;
+ } finally {
+ scheduler.shutDown();
+ }
+ return 0;
+ }
+
+ /**
+ * Start a ProcedureScheduler and submit the job.
+ *
+ * @param command the command options.
+ * @param inputSrc the source input. This specifies the source path.
+ * @param inputDst the dst input. This specifies the dst path.
+ */
+ private int submit(CommandLine command, String inputSrc, String inputDst)
+ throws IOException {
+ Builder builder = new Builder(inputSrc, inputDst);
+ // parse options.
+ builder.setRouterCluster(command.hasOption(ROUTER.getOpt()));
+ builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt()));
+ if (command.hasOption(MAP.getOpt())) {
+ builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt())));
+ }
+ if (command.hasOption(BANDWIDTH.getOpt())) {
+ builder.setBandWidth(
+ Integer.parseInt(command.getOptionValue(BANDWIDTH.getOpt())));
+ }
+ if (command.hasOption(DELAY_DURATION.getOpt())) {
+ builder.setDelayDuration(
+ Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt())));
+ }
+ if (command.hasOption(TRASH.getOpt())) {
+ String val = command.getOptionValue(TRASH.getOpt());
+ if (val.equalsIgnoreCase("skip")) {
+ builder.setTrashOpt(TrashOption.SKIP);
+ } else if (val.equalsIgnoreCase("trash")) {
+ builder.setTrashOpt(TrashOption.TRASH);
+ } else if (val.equalsIgnoreCase("delete")) {
+ builder.setTrashOpt(TrashOption.DELETE);
+ } else {
+ printUsage();
+ return -1;
+ }
+ }
+
+ // Submit the job.
+ BalanceProcedureScheduler scheduler =
+ new BalanceProcedureScheduler(getConf());
+ scheduler.init(false);
+ try {
+ BalanceJob balanceJob = builder.build();
+ // Submit and wait until the job is done.
+ scheduler.submit(balanceJob);
+ scheduler.waitUntilDone(balanceJob);
+ } catch (IOException e) {
+ LOG.error("Submit balance job failed.", e);
+ return -1;
+ } finally {
+ scheduler.shutDown();
+ }
+ return 0;
+ }
+
+ /**
+ * Get src uri from Router.
+ */
+ private Path getSrcPath(String fedPath) throws IOException {
+ String address = getConf().getTrimmed(
+ RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+ RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+ InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+ RouterClient rClient = new RouterClient(routerSocket, getConf());
+ try {
+ MountTableManager mountTable = rClient.getMountTableManager();
+ MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable);
+ if (entry == null) {
+ throw new IllegalArgumentException(
+ "The mount point doesn't exist. path=" + fedPath);
+ } else if (entry.getDestinations().size() > 1) {
+ throw new IllegalArgumentException(
+ "The mount point has more than one destination. path=" + fedPath);
+ } else {
+ String ns = entry.getDestinations().get(0).getNameserviceId();
+ String path = entry.getDestinations().get(0).getDest();
+ return new Path("hdfs://" + ns + path);
+ }
+ } finally {
+ rClient.close();
+ }
+ }
+
+ private void printUsage() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(
+ "fedbalance OPTIONS [submit|continue] <src> <target>\n\nOPTIONS",
+ CLI_OPTIONS);
+ }
+
+ /**
+ * Main function of the FedBalance program. Parses the input arguments and
+ * invokes the FedBalance::run() method, via the ToolRunner.
+ * @param argv Command-line arguments sent to FedBalance.
+ */
+ public static void main(String[] argv) {
+ Configuration conf = new HdfsConfiguration();
+ Class<Tool> balanceClazz = (Class<Tool>) conf
+ .getClass(FEDERATION_BALANCE_CLASS, FedBalance.class);
+ Tool balancer = ReflectionUtils.newInstance(balanceClazz, conf);
+ int exitCode;
+ try {
+ exitCode = ToolRunner.run(balancer, argv);
+ } catch (Exception e) {
+ LOG.warn("Couldn't complete FedBalance operation.", e);
+ exitCode = -1;
+ }
+ System.exit(exitCode);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java
similarity index 72%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java
index f869035..952aef2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java
@@ -15,16 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.classification.InterfaceAudience;
/**
- * This class contains constants for configuration keys and default values
- * used in hdfs procedure.
+ * Federation balance configuration properties.
*/
@InterfaceAudience.Private
-public final class BalanceProcedureConfigKeys {
+public final class FedBalanceConfigs {
+ /* The class used for federation balance */
+ public static final String FEDERATION_BALANCE_CLASS =
+ "federation.balance.class";
+ public static final String LAST_SNAPSHOT_NAME = "DISTCP-BALANCE-CURRENT";
+ public static final String CURRENT_SNAPSHOT_NAME = "DISTCP-BALANCE-NEXT";
+ /* Specify the behaviour of trash. */
+ public enum TrashOption {
+ TRASH, DELETE, SKIP
+ }
+
/* The worker threads number of the BalanceProcedureScheduler */
public static final String WORK_THREAD_NUM =
"hadoop.hdfs.procedure.work.thread.num";
@@ -37,5 +46,5 @@ public final class BalanceProcedureConfigKeys {
public static final String JOURNAL_CLASS =
"hadoop.hdfs.procedure.journal.class";
- private BalanceProcedureConfigKeys() {}
+ private FedBalanceConfigs(){}
}
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
new file mode 100644
index 0000000..56be7db
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+
+/**
+ * This class contains the basic information needed when Federation Balance.
+ */
+public class FedBalanceContext implements Writable {
+
+ /* the source path in the source sub-cluster */
+ private Path src;
+ /* the target path in the target sub-cluster */
+ private Path dst;
+ /* the mount point to be balanced */
+ private String mount;
+ /* Force close all open files when there is no diff between src and dst */
+ private boolean forceCloseOpenFiles;
+ /* Disable write by setting the mount point readonly. */
+ private boolean useMountReadOnly;
+ /* The map number of the distcp job. */
+ private int mapNum;
+ /* The bandwidth limit of the distcp job(MB). */
+ private int bandwidthLimit;
+ /* Move source path to trash after all the data are sync to target. Otherwise
+ delete the source directly. */
+ private TrashOption trashOpt;
+ /* How long will the procedures be delayed. */
+ private long delayDuration;
+
+ private Configuration conf;
+
+ public FedBalanceContext() {}
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public Path getSrc() {
+ return src;
+ }
+
+ public Path getDst() {
+ return dst;
+ }
+
+ public String getMount() {
+ return mount;
+ }
+
+ public boolean getForceCloseOpenFiles() {
+ return forceCloseOpenFiles;
+ }
+
+ public boolean getUseMountReadOnly() {
+ return useMountReadOnly;
+ }
+
+ public int getMapNum() {
+ return mapNum;
+ }
+
+ public int getBandwidthLimit() {
+ return bandwidthLimit;
+ }
+
+ public TrashOption getTrashOpt() {
+ return trashOpt;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ conf.write(out);
+ Text.writeString(out, src.toString());
+ Text.writeString(out, dst.toString());
+ Text.writeString(out, mount);
+ out.writeBoolean(forceCloseOpenFiles);
+ out.writeBoolean(useMountReadOnly);
+ out.writeInt(mapNum);
+ out.writeInt(bandwidthLimit);
+ out.writeInt(trashOpt.ordinal());
+ out.writeLong(delayDuration);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ conf = new Configuration(false);
+ conf.readFields(in);
+ src = new Path(Text.readString(in));
+ dst = new Path(Text.readString(in));
+ mount = Text.readString(in);
+ forceCloseOpenFiles = in.readBoolean();
+ useMountReadOnly = in.readBoolean();
+ mapNum = in.readInt();
+ bandwidthLimit = in.readInt();
+ trashOpt = TrashOption.values()[in.readInt()];
+ delayDuration = in.readLong();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ FedBalanceContext bc = (FedBalanceContext) obj;
+ return new EqualsBuilder()
+ .append(src, bc.src)
+ .append(dst, bc.dst)
+ .append(mount, bc.mount)
+ .append(forceCloseOpenFiles, bc.forceCloseOpenFiles)
+ .append(useMountReadOnly, bc.useMountReadOnly)
+ .append(mapNum, bc.mapNum)
+ .append(bandwidthLimit, bc.bandwidthLimit)
+ .append(trashOpt, bc.trashOpt)
+ .append(delayDuration, bc.delayDuration)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(src)
+ .append(dst)
+ .append(mount)
+ .append(forceCloseOpenFiles)
+ .append(useMountReadOnly)
+ .append(mapNum)
+ .append(bandwidthLimit)
+ .append(trashOpt)
+ .append(delayDuration)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("FedBalance context:");
+ builder.append(" src=").append(src);
+ builder.append(", dst=").append(dst);
+ if (useMountReadOnly) {
+ builder.append(", router-mode=true");
+ builder.append(", mount-point=").append(mount);
+ } else {
+ builder.append(", router-mode=false");
+ }
+ builder.append(", forceCloseOpenFiles=").append(forceCloseOpenFiles);
+ builder.append(", trash=").append(trashOpt.name());
+ builder.append(", map=").append(mapNum);
+ builder.append(", bandwidth=").append(bandwidthLimit);
+ builder.append(", delayDuration=").append(delayDuration);
+ return builder.toString();
+ }
+
+ static class Builder {
+ private final Path src;
+ private final Path dst;
+ private final String mount;
+ private final Configuration conf;
+ private boolean forceCloseOpenFiles = false;
+ private boolean useMountReadOnly = false;
+ private int mapNum;
+ private int bandwidthLimit;
+ private TrashOption trashOpt;
+ private long delayDuration;
+
+ /**
+ * This class helps building the FedBalanceContext.
+ *
+ * @param src the source path in the source sub-cluster.
+ * @param dst the target path in the target sub-cluster.
+ * @param mount the mount point to be balanced.
+ * @param conf the configuration.
+ */
+ Builder(Path src, Path dst, String mount, Configuration conf) {
+ this.src = src;
+ this.dst = dst;
+ this.mount = mount;
+ this.conf = conf;
+ }
+
+ /**
+ * Force close open files.
+ * @param value true if force close all the open files.
+ */
+ public Builder setForceCloseOpenFiles(boolean value) {
+ this.forceCloseOpenFiles = value;
+ return this;
+ }
+
+ /**
+ * Use mount point readonly to disable write.
+ * @param value true if disabling write by setting mount point readonly.
+ */
+ public Builder setUseMountReadOnly(boolean value) {
+ this.useMountReadOnly = value;
+ return this;
+ }
+
+ /**
+ * The map number of the distcp job.
+ * @param value the map number of the distcp.
+ */
+ public Builder setMapNum(int value) {
+ this.mapNum = value;
+ return this;
+ }
+
+ /**
+ * The bandwidth limit of the distcp job(MB).
+ * @param value the bandwidth.
+ */
+ public Builder setBandwidthLimit(int value) {
+ this.bandwidthLimit = value;
+ return this;
+ }
+
+ /**
+ * Specify the trash behaviour after all the data is sync to the target.
+ * @param value the trash option.
+ * */
+ public Builder setTrash(TrashOption value) {
+ this.trashOpt = value;
+ return this;
+ }
+
+ /**
+ * Specify the delayed duration when the procedures need to retry.
+ */
+ public Builder setDelayDuration(long value) {
+ this.delayDuration = value;
+ return this;
+ }
+
+ /**
+ * Build the FedBalanceContext.
+ *
+ * @return the FedBalanceContext obj.
+ */
+ public FedBalanceContext build() {
+ FedBalanceContext context = new FedBalanceContext();
+ context.src = this.src;
+ context.dst = this.dst;
+ context.mount = this.mount;
+ context.conf = this.conf;
+ context.forceCloseOpenFiles = this.forceCloseOpenFiles;
+ context.useMountReadOnly = this.useMountReadOnly;
+ context.mapNum = this.mapNum;
+ context.bandwidthLimit = this.bandwidthLimit;
+ context.trashOpt = this.trashOpt;
+ context.delayDuration = this.delayDuration;
+ return context;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java
new file mode 100644
index 0000000..8f78983
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Update mount table.
+ * Old mount table:
+ * /a/b/c -> {ns:src path:/a/b/c}
+ * New mount table:
+ * /a/b/c -> {ns:dst path:/a/b/c}
+ */
+public class MountTableProcedure extends BalanceProcedure {
+
+ private String mount;
+ private String dstPath;
+ private String dstNs;
+ private Configuration conf;
+
+ public MountTableProcedure() {}
+
+ /**
+ * Update mount entry to specified dst uri.
+ *
+ * @param mount the mount entry to be updated.
+ * @param dstPath the sub-cluster uri of the dst path.
+ * @param conf the configuration.
+ */
+ public MountTableProcedure(String name, String nextProcedure,
+ long delayDuration, String mount, String dstPath, String dstNs,
+ Configuration conf) throws IOException {
+ super(name, nextProcedure, delayDuration);
+ this.mount = mount;
+ this.dstPath = dstPath;
+ this.dstNs = dstNs;
+ this.conf = conf;
+ }
+
+ @Override
+ public boolean execute() throws RetryException, IOException {
+ updateMountTable();
+ return true;
+ }
+
+ private void updateMountTable() throws IOException {
+ updateMountTableDestination(mount, dstNs, dstPath, conf);
+ enableWrite(mount, conf);
+ }
+
+ /**
+ * Update the destination of the mount point to target namespace and target
+ * path.
+ *
+ * @param mount the mount point.
+ * @param dstNs the target namespace.
+ * @param dstPath the target path
+ * @param conf the configuration of the router.
+ */
+ private static void updateMountTableDestination(String mount, String dstNs,
+ String dstPath, Configuration conf) throws IOException {
+ String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+ RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+ InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+ RouterClient rClient = new RouterClient(routerSocket, conf);
+ try {
+ MountTableManager mountTable = rClient.getMountTableManager();
+
+ MountTable originalEntry = getMountEntry(mount, mountTable);
+ if (originalEntry == null) {
+ throw new IOException("Mount table " + mount + " doesn't exist");
+ } else {
+ RemoteLocation remoteLocation =
+ new RemoteLocation(dstNs, dstPath, mount);
+ originalEntry.setDestinations(Arrays.asList(remoteLocation));
+ UpdateMountTableEntryRequest updateRequest =
+ UpdateMountTableEntryRequest.newInstance(originalEntry);
+ UpdateMountTableEntryResponse response =
+ mountTable.updateMountTableEntry(updateRequest);
+ if (!response.getStatus()) {
+ throw new IOException("Failed update mount table " + mount);
+ }
+ rClient.getMountTableManager().refreshMountTableEntries(
+ RefreshMountTableEntriesRequest.newInstance());
+ }
+ } finally {
+ rClient.close();
+ }
+ }
+
+ /**
+ * Gets the mount table entry.
+ * @param mount name of the mount entry.
+ * @param mountTable the mount table.
+ * @return corresponding mount entry.
+ * @throws IOException in case of failure to retrieve mount entry.
+ */
+ public static MountTable getMountEntry(String mount,
+ MountTableManager mountTable)
+ throws IOException {
+ GetMountTableEntriesRequest getRequest =
+ GetMountTableEntriesRequest.newInstance(mount);
+ GetMountTableEntriesResponse getResponse =
+ mountTable.getMountTableEntries(getRequest);
+ List<MountTable> results = getResponse.getEntries();
+ MountTable existingEntry = null;
+ for (MountTable result : results) {
+ if (mount.equals(result.getSourcePath())) {
+ existingEntry = result;
+ break;
+ }
+ }
+ return existingEntry;
+ }
+
+ /**
+ * Disable write by making the mount point readonly.
+ *
+ * @param mount the mount point to set readonly.
+ * @param conf the configuration of the router.
+ */
+ static void disableWrite(String mount, Configuration conf)
+ throws IOException {
+ setMountReadOnly(mount, true, conf);
+ }
+
+ /**
+ * Enable write by cancelling the mount point readonly.
+ *
+ * @param mount the mount point to cancel readonly.
+ * @param conf the configuration of the router.
+ */
+ static void enableWrite(String mount, Configuration conf) throws IOException {
+ setMountReadOnly(mount, false, conf);
+ }
+
+ /**
+ * Enable or disable readonly of the mount point.
+ *
+ * @param mount the mount point.
+ * @param readOnly enable or disable readonly.
+ * @param conf the configuration of the router.
+ */
+ private static void setMountReadOnly(String mount, boolean readOnly,
+ Configuration conf) throws IOException {
+ String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+ RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+ InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
+ RouterClient rClient = new RouterClient(routerSocket, conf);
+ try {
+ MountTableManager mountTable = rClient.getMountTableManager();
+
+ MountTable originalEntry = getMountEntry(mount, mountTable);
+ if (originalEntry == null) {
+ throw new IOException("Mount table " + mount + " doesn't exist");
+ } else {
+ originalEntry.setReadOnly(readOnly);
+ UpdateMountTableEntryRequest updateRequest =
+ UpdateMountTableEntryRequest.newInstance(originalEntry);
+ UpdateMountTableEntryResponse response =
+ mountTable.updateMountTableEntry(updateRequest);
+ if (!response.getStatus()) {
+ throw new IOException(
+ "Failed update mount table " + mount + " with readonly="
+ + readOnly);
+ }
+ rClient.getMountTableManager().refreshMountTableEntries(
+ RefreshMountTableEntriesRequest.newInstance());
+ }
+ } finally {
+ rClient.close();
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, mount);
+ Text.writeString(out, dstPath);
+ Text.writeString(out, dstNs);
+ conf.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ mount = Text.readString(in);
+ dstPath = Text.readString(in);
+ dstNs = Text.readString(in);
+ conf = new Configuration(false);
+ conf.readFields(in);
+ }
+
+ @VisibleForTesting
+ String getMount() {
+ return mount;
+ }
+
+ @VisibleForTesting
+ String getDstPath() {
+ return dstPath;
+ }
+
+ @VisibleForTesting
+ String getDstNs() {
+ return dstNs;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java
new file mode 100644
index 0000000..94ae616
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+
+/**
+ * This procedure moves the source path to the corresponding trash.
+ */
+public class TrashProcedure extends BalanceProcedure {
+
+ private DistributedFileSystem srcFs;
+ private FedBalanceContext context;
+ private Configuration conf;
+
+ public TrashProcedure() {}
+
+ /**
+ * The constructor of TrashProcedure.
+ *
+ * @param name the name of the procedure.
+ * @param nextProcedure the name of the next procedure.
+ * @param delayDuration the delay duration when this procedure is delayed.
+ * @param context the federation balance context.
+ */
+ public TrashProcedure(String name, String nextProcedure, long delayDuration,
+ FedBalanceContext context) throws IOException {
+ super(name, nextProcedure, delayDuration);
+ this.context = context;
+ this.conf = context.getConf();
+ this.srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+ }
+
+ @Override
+ public boolean execute() throws IOException {
+ moveToTrash();
+ return true;
+ }
+
+ /**
+ * Delete source path to trash.
+ */
+ void moveToTrash() throws IOException {
+ Path src = context.getSrc();
+ if (srcFs.exists(src)) {
+ TrashOption trashOption = context.getTrashOpt();
+ switch (trashOption) {
+ case TRASH:
+ conf.setFloat(FS_TRASH_INTERVAL_KEY, 60);
+ if (!Trash.moveToAppropriateTrash(srcFs, src, conf)) {
+ throw new IOException("Failed move " + src + " to trash.");
+ }
+ break;
+ case DELETE:
+ if (!srcFs.delete(src, true)) {
+ throw new IOException("Failed delete " + src);
+ }
+ LOG.info("{} is deleted.", src);
+ break;
+ case SKIP:
+ break;
+ default:
+ throw new IOException("Unexpected trash option=" + trashOption);
+ }
+ }
+ }
+
+ public FedBalanceContext getContext() {
+ return context;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ context.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ context = new FedBalanceContext();
+ context.readFields(in);
+ conf = context.getConf();
+ srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java
similarity index 72%
copy from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
copy to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java
index 626d3b3..3007402 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,10 @@
* limitations under the License.
*/
+
/**
- * Classes under this package implement a state machine used for balancing data
- * across federation namespaces.
+ * FedBalance is a tool for balancing data across federation clusters.
*/
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-
-package org.apache.hadoop.hdfs.procedure;
-
+@InterfaceAudience.Public
+package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java
similarity index 99%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java
index 847092a..8d5f9d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.builder.EqualsBuilder;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java
index 011ae85..da8eb74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configurable;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java
similarity index 95%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java
index 4e759d8..0da8c36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -37,9 +37,9 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.TMP_TAIL;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOB_PREFIX;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TMP_TAIL;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOB_PREFIX;
/**
* BalanceJournal based on HDFS. This class stores all the journals in the HDFS.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java
index 6320e8f..080a7375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -29,7 +29,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import static org.apache.hadoop.hdfs.procedure.BalanceJob.NEXT_PROCEDURE_NONE;
+import static org.apache.hadoop.tools.fedbalance.procedure.BalanceJob.NEXT_PROCEDURE_NONE;
/**
* The basic components of the Job. Extend this class to implement different
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
index 74606c5..0f82b88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import com.google.common.annotations.VisibleForTesting;
@@ -40,9 +40,9 @@ import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM_DEFAULT;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOURNAL_CLASS;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM_DEFAULT;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOURNAL_CLASS;
/**
* The state machine framework consist of:
* Job: The state machine. It implements the basic logic of the
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java
similarity index 95%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java
index 626d3b3..cb03d13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java
+++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java
@@ -23,7 +23,7 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh b/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh
new file mode 100644
index 0000000..2872c7a
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if ! declare -f hadoop_subcommand_fedbalance >/dev/null 2>/dev/null; then
+
+ if [[ "${HADOOP_SHELL_EXECNAME}" = hadoop ]]; then
+ hadoop_add_subcommand "fedbalance" client "balance data between sub-clusters"
+ fi
+
+ # this can't be indented otherwise shelldocs won't get it
+
+## @description fedbalance command for hadoop
+## @audience public
+## @stability stable
+## @replaceable yes
+function hadoop_subcommand_fedbalance
+{
+ # shellcheck disable=SC2034
+ HADOOP_CLASSNAME=org.apache.hadoop.tools.fedbalance.FedBalance
+ hadoop_add_to_classpath_tools hadoop-distcp
+ hadoop_add_to_classpath_tools hadoop-federation-balance
+}
+
+fi
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
new file mode 100644
index 0000000..ec565c3
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure.Stage;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure.RetryException;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.net.URI;
+import java.util.Random;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test DistCpProcedure.
+ */
+public class TestDistCpProcedure {
+ private static MiniDFSCluster cluster;
+ private static Configuration conf;
+ static final String MOUNT = "mock_mount_point";
+ private static final String SRCDAT = "srcdat";
+ private static final String DSTDAT = "dstdat";
+ private static final long BLOCK_SIZE = 1024;
+ private static final long FILE_SIZE = BLOCK_SIZE * 100;
+ private FileEntry[] srcfiles =
+ {new FileEntry(SRCDAT, true), new FileEntry(SRCDAT + "/a", false),
+ new FileEntry(SRCDAT + "/b", true),
+ new FileEntry(SRCDAT + "/b/c", false)};
+ private static String nnUri;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ DistCpProcedure.enabledForTest = true;
+ conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ cluster.waitActive();
+
+ String workPath =
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
+ conf.set(SCHEDULER_JOURNAL_URI, workPath);
+
+ nnUri = FileSystem.getDefaultUri(conf).toString();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ DistCpProcedure.enabledForTest = false;
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testSuccessfulDistCpProcedure() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+ FsPermission originalPerm = new FsPermission(777);
+ fs.setPermission(src, originalPerm);
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+ BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
+ scheduler.init(true);
+
+ BalanceJob balanceJob =
+ new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
+ scheduler.submit(balanceJob);
+ scheduler.waitUntilDone(balanceJob);
+ assertTrue(balanceJob.isJobDone());
+ if (balanceJob.getError() != null) {
+ throw balanceJob.getError();
+ }
+ assertNull(balanceJob.getError());
+ assertTrue(fs.exists(dst));
+ assertFalse(
+ fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+ assertFalse(
+ fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+ assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
+ assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
+ for (FileEntry e : srcfiles) { // verify file len.
+ if (!e.isDir) {
+ Path targetFile = new Path(testRoot, e.path.replace(SRCDAT, DSTDAT));
+ assertEquals(FILE_SIZE, fs.getFileStatus(targetFile).getLen());
+ }
+ }
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testInitDistCp() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+ // set permission.
+ fs.setPermission(src, FsPermission.createImmutable((short) 020));
+
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+
+ // submit distcp.
+ try {
+ dcProcedure.initDistCp();
+ } catch (RetryException e) {
+ }
+ fs.delete(new Path(src, "a"), true);
+ // wait until job done.
+ executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+ () -> dcProcedure.initDistCp());
+ assertTrue(fs.exists(dst));
+ // Because we used snapshot, the file should be copied.
+ assertTrue(fs.exists(new Path(dst, "a")));
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testDiffDistCp() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+ executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+ () -> dcProcedure.initDistCp());
+ assertTrue(fs.exists(dst));
+
+ // move file out of src and test distcp.
+ fs.rename(new Path(src, "a"), new Path("/a"));
+ executeProcedure(dcProcedure, Stage.FINISH,
+ () -> dcProcedure.finalDistCp());
+ assertFalse(fs.exists(new Path(dst, "a")));
+ // move back file src/a and test distcp.
+ fs.rename(new Path("/a"), new Path(src, "a"));
+ executeProcedure(dcProcedure, Stage.FINISH,
+ () -> dcProcedure.finalDistCp());
+ assertTrue(fs.exists(new Path(dst, "a")));
+ // append file src/a and test.
+ OutputStream out = fs.append(new Path(src, "a"));
+ out.write("hello".getBytes());
+ out.close();
+ long len = fs.getFileStatus(new Path(src, "a")).getLen();
+ executeProcedure(dcProcedure, Stage.FINISH,
+ () -> dcProcedure.finalDistCp());
+ assertEquals(len, fs.getFileStatus(new Path(dst, "a")).getLen());
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testStageFinalDistCp() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+ // open files.
+ OutputStream out = fs.append(new Path(src, "a"));
+
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+ executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
+ () -> dcProcedure.initDistCp());
+ executeProcedure(dcProcedure, Stage.FINISH,
+ () -> dcProcedure.finalDistCp());
+ // Verify all the open files have been closed.
+ intercept(RemoteException.class, "LeaseExpiredException",
+ "Expect RemoteException(LeaseExpiredException).", () -> out.close());
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testStageFinish() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+ fs.mkdirs(src);
+ fs.mkdirs(dst);
+ fs.allowSnapshot(src);
+ fs.allowSnapshot(dst);
+ fs.createSnapshot(src, LAST_SNAPSHOT_NAME);
+ fs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
+ fs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
+ FsPermission originalPerm = new FsPermission(777);
+ fs.setPermission(src, originalPerm);
+
+ // Test the finish stage.
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+ dcProcedure.disableWrite();
+ dcProcedure.finish();
+
+ // Verify path and permission.
+ assertTrue(fs.exists(dst));
+ assertFalse(fs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR)));
+ assertFalse(fs.exists(new Path(dst, HdfsConstants.DOT_SNAPSHOT_DIR)));
+ assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
+ assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testRecoveryByStage() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ final DistCpProcedure[] dcp = new DistCpProcedure[1];
+ dcp[0] = new DistCpProcedure("distcp-procedure", null, 1000, context);
+
+ // Doing serialization and deserialization before each stage to monitor the
+ // recovery.
+ dcp[0] = serializeProcedure(dcp[0]);
+ executeProcedure(dcp[0], Stage.INIT_DISTCP, () -> dcp[0].preCheck());
+ dcp[0] = serializeProcedure(dcp[0]);
+ executeProcedure(dcp[0], Stage.DIFF_DISTCP, () -> dcp[0].initDistCp());
+ fs.delete(new Path(src, "a"), true); // make some difference.
+ dcp[0] = serializeProcedure(dcp[0]);
+ executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
+ dcp[0] = serializeProcedure(dcp[0]);
+ executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite());
+ dcp[0] = serializeProcedure(dcp[0]);
+ OutputStream out = fs.append(new Path(src, "b/c"));
+ executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp());
+ intercept(RemoteException.class, "LeaseExpiredException",
+ "Expect RemoteException(LeaseExpiredException).", () -> out.close());
+ dcp[0] = serializeProcedure(dcp[0]);
+ assertTrue(dcp[0].execute());
+ assertTrue(fs.exists(dst));
+ assertFalse(
+ fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+ assertFalse(
+ fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testShutdown() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+ BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
+ scheduler.init(true);
+
+ BalanceJob balanceJob =
+ new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
+ scheduler.submit(balanceJob);
+
+ long sleep = Math.abs(new Random().nextLong()) % 10000;
+ Thread.sleep(sleep);
+ scheduler.shutDown();
+ cleanup(fs, new Path(testRoot));
+ }
+
+ @Test(timeout = 30000)
+ public void testDisableWrite() throws Exception {
+ String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
+ DistributedFileSystem fs =
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+ createFiles(fs, testRoot, srcfiles);
+ Path src = new Path(testRoot, SRCDAT);
+ Path dst = new Path(testRoot, DSTDAT);
+
+ FedBalanceContext context = buildContext(src, dst, MOUNT);
+ DistCpProcedure dcProcedure =
+ new DistCpProcedure("distcp-procedure", null, 1000, context);
+ assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort());
+ executeProcedure(dcProcedure, Stage.FINAL_DISTCP,
+ () -> dcProcedure.disableWrite());
+ assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
+ cleanup(fs, new Path(testRoot));
+ }
+
+ private FedBalanceContext buildContext(Path src, Path dst, String mount) {
+ return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10)
+ .setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000)
+ .build();
+ }
+
+ interface Call {
+ void execute() throws IOException, RetryException;
+ }
+
+ /**
+ * Execute the procedure until its stage is updated to the target stage.
+ *
+ * @param procedure the procedure to be executed and verified.
+ * @param target the target stage.
+ * @param call the function executing the procedure.
+ */
+ private static void executeProcedure(DistCpProcedure procedure, Stage target,
+ Call call) throws IOException {
+ Stage stage = Stage.PRE_CHECK;
+ procedure.updateStage(stage);
+ while (stage != target) {
+ try {
+ call.execute();
+ } catch (RetryException e) {
+ } finally {
+ stage = procedure.getStage();
+ }
+ }
+ }
+
+ static class FileEntry {
+ private String path;
+ private boolean isDir;
+
+ FileEntry(String path, boolean isDir) {
+ this.path = path;
+ this.isDir = isDir;
+ }
+
+ String getPath() {
+ return path;
+ }
+
+ boolean isDirectory() {
+ return isDir;
+ }
+ }
+
+ /**
+ * Create directories and files with random data.
+ *
+ * @param fs the file system obj.
+ * @param topdir the base dir of the directories and files.
+ * @param entries the directory and file entries to be created.
+ */
+ private void createFiles(DistributedFileSystem fs, String topdir,
+ FileEntry[] entries) throws IOException {
+ long seed = System.currentTimeMillis();
+ Random rand = new Random(seed);
+ short replicationFactor = 2;
+ for (FileEntry entry : entries) {
+ Path newPath = new Path(topdir + "/" + entry.getPath());
+ if (entry.isDirectory()) {
+ fs.mkdirs(newPath);
+ } else {
+ int bufSize = 128;
+ DFSTestUtil.createFile(fs, newPath, bufSize, FILE_SIZE, BLOCK_SIZE,
+ replicationFactor, seed);
+ }
+ seed = System.currentTimeMillis() + rand.nextLong();
+ }
+ }
+
+ private DistCpProcedure serializeProcedure(DistCpProcedure dcp)
+ throws IOException {
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ DataOutput dataOut = new DataOutputStream(bao);
+ dcp.write(dataOut);
+ dcp = new DistCpProcedure();
+ dcp.readFields(
+ new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
+ return dcp;
+ }
+
+ private void cleanup(DistributedFileSystem dfs, Path root)
+ throws IOException {
+ Path src = new Path(root, SRCDAT);
+ Path dst = new Path(root, DSTDAT);
+ DistCpProcedure.cleanupSnapshot(dfs, src);
+ DistCpProcedure.cleanupSnapshot(dfs, dst);
+ dfs.delete(root, true);
+ }
+}
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java
new file mode 100644
index 0000000..9dd4e5d
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Basic tests of MountTableProcedure.
+ */
+public class TestMountTableProcedure {
+
+ private static StateStoreDFSCluster cluster;
+ private static RouterContext routerContext;
+ private static Configuration routerConf;
+ private static List<MountTable> mockMountTable;
+ private static StateStoreService stateStore;
+
+ @BeforeClass
+ public static void globalSetUp() throws Exception {
+ cluster = new StateStoreDFSCluster(false, 1);
+ // Build and start a router with State Store + admin + RPC
+ Configuration conf = new RouterConfigBuilder()
+ .stateStore()
+ .admin()
+ .rpc()
+ .build();
+ cluster.addRouterOverrides(conf);
+ cluster.startRouters();
+ routerContext = cluster.getRandomRouter();
+ mockMountTable = cluster.generateMockMountTable();
+ Router router = routerContext.getRouter();
+ stateStore = router.getStateStore();
+
+ // Add two name services for testing
+ ActiveNamenodeResolver membership = router.getNamenodeResolver();
+ membership.registerNamenode(createNamenodeReport("ns0", "nn1",
+ HAServiceProtocol.HAServiceState.ACTIVE));
+ membership.registerNamenode(createNamenodeReport("ns1", "nn1",
+ HAServiceProtocol.HAServiceState.ACTIVE));
+ stateStore.refreshCaches(true);
+
+ routerConf = new Configuration();
+ InetSocketAddress routerSocket = router.getAdminServerAddress();
+ routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+ routerSocket);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.stopRouter(routerContext);
+ }
+
+ @Before
+ public void testSetup() throws Exception {
+ assertTrue(
+ synchronizeRecords(stateStore, mockMountTable, MountTable.class));
+ // Avoid running with random users
+ routerContext.resetAdminClient();
+ }
+
+ @Test
+ public void testUpdateMountpoint() throws Exception {
+ // Firstly add mount entry: /test-path->{ns0,/test-path}.
+ String mount = "/test-path";
+ String dst = "/test-dst";
+ MountTable newEntry = MountTable
+ .newInstance(mount, Collections.singletonMap("ns0", mount),
+ Time.now(), Time.now());
+ MountTableManager mountTable =
+ routerContext.getAdminClient().getMountTableManager();
+ AddMountTableEntryRequest addRequest =
+ AddMountTableEntryRequest.newInstance(newEntry);
+ AddMountTableEntryResponse addResponse =
+ mountTable.addMountTableEntry(addRequest);
+ assertTrue(addResponse.getStatus());
+ // verify the mount entry is added successfully.
+ GetMountTableEntriesRequest request =
+ GetMountTableEntriesRequest.newInstance("/");
+ stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
+ GetMountTableEntriesResponse response =
+ mountTable.getMountTableEntries(request);
+ assertEquals(3, response.getEntries().size());
+
+ // set the mount table to readonly.
+ MountTableProcedure.disableWrite(mount, routerConf);
+
+ // test MountTableProcedure updates the mount point.
+ String dstNs = "ns1";
+ MountTableProcedure smtp =
+ new MountTableProcedure("single-mount-table-procedure", null,
+ 1000, mount, dst, dstNs, routerConf);
+ assertTrue(smtp.execute());
+ stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
+ // verify the mount entry is updated to /
+ MountTable entry =
+ MountTableProcedure.getMountEntry(mount, mountTable);
+ assertNotNull(entry);
+ assertEquals(1, entry.getDestinations().size());
+ String nsId = entry.getDestinations().get(0).getNameserviceId();
+ String dstPath = entry.getDestinations().get(0).getDest();
+ assertEquals(dstNs, nsId);
+ assertEquals(dst, dstPath);
+ // Verify the mount table is not readonly.
+ URI address = routerContext.getFileSystemURI();
+ DFSClient routerClient = new DFSClient(address, routerConf);
+ MountTableProcedure.enableWrite(mount, routerConf);
+ intercept(RemoteException.class, "No namenode available to invoke mkdirs",
+ "Expect no namenode exception.", () -> routerClient
+ .mkdirs(mount + "/file", new FsPermission(020), false));
+ }
+
+ @Test
+ public void testDisableAndEnableWrite() throws Exception {
+ // Firstly add mount entry: /test-write->{ns0,/test-write}.
+ String mount = "/test-write";
+ MountTable newEntry = MountTable
+ .newInstance(mount, Collections.singletonMap("ns0", mount),
+ Time.now(), Time.now());
+ MountTableManager mountTable =
+ routerContext.getAdminClient().getMountTableManager();
+ AddMountTableEntryRequest addRequest =
+ AddMountTableEntryRequest.newInstance(newEntry);
+ AddMountTableEntryResponse addResponse =
+ mountTable.addMountTableEntry(addRequest);
+ assertTrue(addResponse.getStatus());
+ stateStore.loadCache(MountTableStoreImpl.class, true); // load cache.
+
+ // Construct client.
+ URI address = routerContext.getFileSystemURI();
+ DFSClient routerClient = new DFSClient(address, routerConf);
+ // Verify the mount point is not readonly.
+ intercept(RemoteException.class, "No namenode available to invoke mkdirs",
+ "Expect no namenode exception.", () -> routerClient
+ .mkdirs(mount + "/file", new FsPermission(020), false));
+
+ // Verify disable write.
+ MountTableProcedure.disableWrite(mount, routerConf);
+ intercept(RemoteException.class, "is in a read only mount point",
+ "Expect readonly exception.", () -> routerClient
+ .mkdirs(mount + "/dir", new FsPermission(020), false));
+
+ // Verify enable write.
+ MountTableProcedure.enableWrite(mount, routerConf);
+ intercept(RemoteException.class, "No namenode available to invoke mkdirs",
+ "Expect no namenode exception.", () -> routerClient
+ .mkdirs(mount + "/file", new FsPermission(020), false));
+ }
+
+ @Test
+ public void testSeDeserialize() throws Exception {
+ String fedPath = "/test-path";
+ String dst = "/test-dst";
+ String dstNs = "ns1";
+ MountTableProcedure smtp =
+ new MountTableProcedure("single-mount-table-procedure", null,
+ 1000, fedPath, dst, dstNs, routerConf);
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ DataOutput dataOut = new DataOutputStream(bao);
+ smtp.write(dataOut);
+ smtp = new MountTableProcedure();
+ smtp.readFields(
+ new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
+ assertEquals(fedPath, smtp.getMount());
+ assertEquals(dst, smtp.getDstPath());
+ assertEquals(dstNs, smtp.getDstNs());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java
new file mode 100644
index 0000000..a128932
--- /dev/null
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.fedbalance;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test TrashProcedure.
+ */
+public class TestTrashProcedure {
+
+ private static Configuration conf;
+ private static MiniDFSCluster cluster;
+ private static String nnUri;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ conf = new Configuration();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ cluster.waitActive();
+ nnUri = FileSystem.getDefaultUri(conf).toString();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testTrashProcedure() throws Exception {
+ Path src = new Path("/" + getMethodName() + "-src");
+ Path dst = new Path("/" + getMethodName() + "-dst");
+ FileSystem fs = cluster.getFileSystem();
+ fs.mkdirs(src);
+ fs.mkdirs(new Path(src, "dir"));
+ assertTrue(fs.exists(src));
+
+ FedBalanceContext context =
+ new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf)
+ .setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH)
+ .build();
+ TrashProcedure trashProcedure =
+ new TrashProcedure("trash-procedure", null, 1000, context);
+ trashProcedure.moveToTrash();
+ assertFalse(fs.exists(src));
+ }
+
+ @Test
+ public void testSeDeserialize() throws Exception {
+ Path src = new Path("/" + getMethodName() + "-src");
+ Path dst = new Path("/" + getMethodName() + "-dst");
+ FedBalanceContext context =
+ new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf)
+ .setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH)
+ .build();
+ TrashProcedure trashProcedure =
+ new TrashProcedure("trash-procedure", null, 1000, context);
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ DataOutput dataOut = new DataOutputStream(bao);
+ trashProcedure.write(dataOut);
+ trashProcedure = new TrashProcedure();
+ trashProcedure.readFields(
+ new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
+ assertEquals(context, trashProcedure.getContext());
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java
index 27cfebd..b9c9c1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java
index 706d4a1..9754b09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import java.util.ArrayList;
import java.util.List;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java
index 336873e..faec834 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import java.io.DataInput;
import java.io.DataOutput;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java
similarity index 98%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java
index 39e000b..7a2b449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -43,8 +43,8 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI;
-import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertEquals;
@@ -70,6 +70,7 @@ public class TestBalanceProcedureScheduler {
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///");
CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+ CONF.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
CONF.setInt(WORK_THREAD_NUM, 1);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java
index 941d0a0..804f1aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import java.io.IOException;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java
similarity index 97%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java
rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java
index 8666caf..af46b17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java
+++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.procedure;
+package org.apache.hadoop.tools.fedbalance.procedure;
import org.apache.hadoop.util.Time;
diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml
index f923bb7..cc811fc 100644
--- a/hadoop-tools/hadoop-tools-dist/pom.xml
+++ b/hadoop-tools/hadoop-tools-dist/pom.xml
@@ -46,6 +46,11 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-federation-balance</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>
<scope>compile</scope>
</dependency>
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index eb0a31a..f026bc2 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>hadoop-streaming</module>
<module>hadoop-distcp</module>
+ <module>hadoop-federation-balance</module>
<module>hadoop-dynamometer</module>
<module>hadoop-archives</module>
<module>hadoop-archive-logs</module>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org