You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/02/25 01:50:43 UTC

[30/31] hadoop git commit: HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Contributed by Anu Engineer)

HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Contributed by Anu Engineer)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d1c39f9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d1c39f9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d1c39f9

Branch: refs/heads/HDFS-1312
Commit: 0d1c39f9b6d95eccfe07010dd9052aee326ff0c9
Parents: a140b21
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Feb 24 16:49:30 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Feb 24 16:49:30 2016 -0800

----------------------------------------------------------------------
 .../ClientDatanodeProtocolTranslatorPB.java     |  11 +-
 .../server/datanode/DiskBalancerWorkStatus.java | 194 +++++++++++++++++--
 .../src/main/proto/ClientDatanodeProtocol.proto |   5 +-
 .../hadoop-hdfs/HDFS-1312_CHANGES.txt           |   5 +-
 ...tDatanodeProtocolServerSideTranslatorPB.java |   5 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   8 +-
 .../hdfs/server/datanode/DiskBalancer.java      |  39 ++++
 .../diskbalancer/TestDiskBalancerRPC.java       |  26 ++-
 8 files changed, 249 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 524d127..359d490 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
@@ -380,10 +381,14 @@ public class ClientDatanodeProtocolTranslatorPB implements
           QueryPlanStatusRequestProto.newBuilder().build();
       QueryPlanStatusResponseProto response =
           rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
-      return new DiskBalancerWorkStatus(response.hasResult() ?
-          response.getResult() : 0,
+      DiskBalancerWorkStatus.Result result = Result.NO_PLAN;
+      if(response.hasResult()) {
+        result = DiskBalancerWorkStatus.Result.values()[
+            response.getResult()];
+      }
+
+      return new DiskBalancerWorkStatus(result,
           response.hasPlanID() ? response.getPlanID() : null,
-          response.hasStatus() ? response.getStatus() : null,
           response.hasCurrentStatus() ? response.getCurrentStatus() : null);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
index 6b29ce8..d6943cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java
@@ -19,8 +19,17 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.LinkedList;
 
 /**
  * Helper class that reports how much work has has been done by the node.
@@ -28,33 +37,69 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class DiskBalancerWorkStatus {
-  private final int result;
-  private final String planID;
-  private final String status;
-  private final String currentState;
+
+  private final List<DiskBalancerWorkEntry> currentState;
+  private Result result;
+  private String planID;
+
+  /**
+   * Constructs a default workStatus Object.
+   */
+  public DiskBalancerWorkStatus() {
+    this.currentState = new LinkedList<>();
+  }
+
+  /**
+   * Constructs a workStatus Object.
+   *
+   * @param result - int
+   * @param planID - Plan ID
+   */
+  public DiskBalancerWorkStatus(Result result, String planID) {
+    this();
+    this.result = result;
+    this.planID = planID;
+  }
 
   /**
    * Constructs a workStatus Object.
    *
    * @param result       - int
    * @param planID       - Plan ID
-   * @param status       - Current Status
    * @param currentState - Current State
    */
-  public DiskBalancerWorkStatus(int result, String planID, String status,
-                                String currentState) {
+  public DiskBalancerWorkStatus(Result result, String planID,
+                                List<DiskBalancerWorkEntry> currentState) {
     this.result = result;
     this.planID = planID;
-    this.status = status;
     this.currentState = currentState;
   }
 
+
+  /**
+   * Constructs a workStatus Object.
+   *
+   * @param result       - int
+   * @param planID       - Plan ID
+   * @param currentState - List of WorkEntries.
+   */
+  public DiskBalancerWorkStatus(Result result, String planID,
+                                String currentState) throws IOException {
+    this.result = result;
+    this.planID = planID;
+    ObjectMapper mapper = new ObjectMapper();
+    this.currentState = mapper.readValue(currentState,
+        defaultInstance().constructCollectionType(
+            List.class, DiskBalancerWorkEntry.class));
+  }
+
+
   /**
    * Returns result.
    *
    * @return long
    */
-  public int getResult() {
+  public Result getResult() {
     return result;
   }
 
@@ -68,20 +113,135 @@ public class DiskBalancerWorkStatus {
   }
 
   /**
-   * Returns Status.
+   * Gets current Status.
    *
-   * @return String
+   * @return - Json String
    */
-  public String getStatus() {
-    return status;
+  public List<DiskBalancerWorkEntry> getCurrentState() {
+    return currentState;
   }
 
   /**
-   * Gets current Status.
+   * Return current state as a string.
    *
-   * @return - Json String
+   * @throws IOException
+   **/
+  public String getCurrentStateString() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.writeValueAsString(currentState);
+  }
+
+
+  /**
+   * Adds a new work entry to the list.
+   *
+   * @param entry - DiskBalancerWorkEntry
    */
-  public String getCurrentState() {
-    return currentState;
+
+  public void addWorkEntry(DiskBalancerWorkEntry entry) {
+    Preconditions.checkNotNull(entry);
+    currentState.add(entry);
+  }
+
+  /** Various result values. **/
+  public enum Result {
+    NO_PLAN(0),
+    PLAN_UNDER_PROGRESS(1),
+    PLAN_DONE(2),
+    PLAN_CANCELLED(3);
+    private int result;
+
+    private Result(int result) {
+      this.result = result;
+    }
+
+    /**
+     * Get int value of result.
+     *
+     * @return int
+     */
+    public int getIntResult() {
+      return result;
+    }
+  }
+
+  /**
+   * A class that is used to report each work item that we are working on. This
+   * class describes the Source, Destination and how much data has been already
+   * moved, errors encountered etc. This is useful for the disk balancer stats
+   * as well as the queryStatus RPC.
+   */
+  public static class DiskBalancerWorkEntry {
+    private String sourcePath;
+    private String destPath;
+    private DiskBalancerWorkItem workItem;
+
+    /**
+     * Constructs a Work Entry class.
+     *
+     * @param sourcePath - Source Path where we are moving data from.
+     * @param destPath   - Destination path to where we are moving data to.
+     * @param workItem   - Current work status of this move.
+     */
+    public DiskBalancerWorkEntry(String sourcePath, String destPath,
+                                 DiskBalancerWorkItem workItem) {
+      this.sourcePath = sourcePath;
+      this.destPath = destPath;
+      this.workItem = workItem;
+    }
+
+    /**
+     * Returns the source path.
+     *
+     * @return - Source path
+     */
+    public String getSourcePath() {
+      return sourcePath;
+    }
+
+    /**
+     * Sets the Source Path.
+     *
+     * @param sourcePath - Volume Path.
+     */
+    public void setSourcePath(String sourcePath) {
+      this.sourcePath = sourcePath;
+    }
+
+    /**
+     * Gets the Destination path.
+     *
+     * @return - Path
+     */
+    public String getDestPath() {
+      return destPath;
+    }
+
+    /**
+     * Sets the destination path.
+     *
+     * @param destPath - Path
+     */
+    public void setDestPath(String destPath) {
+      this.destPath = destPath;
+    }
+
+    /**
+     * Gets the current status of work for these volumes.
+     *
+     * @return - Work Item
+     */
+    public DiskBalancerWorkItem getWorkItem() {
+      return workItem;
+    }
+
+    /**
+     * Sets the work item.
+     *
+     * @param workItem - sets the work item information
+     */
+    public void setWorkItem(DiskBalancerWorkItem workItem) {
+      this.workItem = workItem;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
index 175a571..d865f9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
@@ -187,9 +187,8 @@ message QueryPlanStatusRequestProto {
  */
 message QueryPlanStatusResponseProto {
   optional uint32 result = 1;
-  optional string status = 2;
-  optional string planID = 3;
-  optional string currentStatus = 4;
+  optional string planID = 2;
+  optional string currentStatus = 3;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
index 27de7d0..07403cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -24,6 +24,9 @@ HDFS-1312 Change Log
     HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via
     Arpit Agarwal)
 
-    HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via
+    HDFS-9671. DiskBalancer: SubmitPlan implementation. (Anu Engineer via
+    Arpit Agarwal)
+
+    HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via
     Arpit Agarwal)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index 782fbe7..e3f1119 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -281,10 +281,9 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan();
       return QueryPlanStatusResponseProto
           .newBuilder()
-          .setResult(result.getResult())
+          .setResult(result.getResult().getIntResult())
           .setPlanID(result.getPlanID())
-          .setStatus(result.getStatus())
-          .setCurrentStatus(result.getCurrentState())
+          .setCurrentStatus(result.getCurrentStateString())
           .build();
     } catch (Exception e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 405a31b..f440a3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3288,11 +3288,15 @@ public class DataNode extends ReconfigurableBase
         DiskBalancerException.Result.INTERNAL_ERROR);
   }
 
+  /**
+   * Returns the status of current or last executed work plan.
+   * @return DiskBalancerWorkStatus.
+   * @throws IOException
+   */
   @Override
   public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
     checkSuperuserPrivilege();
-    throw new DiskBalancerException("Not Implemented",
-        DiskBalancerException.Result.INTERNAL_ERROR);
+    return this.diskBalancer.queryWorkStatus();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index 1c8ba4cf..c01fb4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -25,6 +25,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.DiskBalancerWorkEntry;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
@@ -68,6 +70,7 @@ public class DiskBalancer {
   private ExecutorService scheduler;
   private Future future;
   private String planID;
+  private DiskBalancerWorkStatus.Result currentResult;
 
   /**
    * Constructs a Disk Balancer object. This object takes care of reading a
@@ -79,6 +82,7 @@ public class DiskBalancer {
    */
   public DiskBalancer(String dataNodeUUID,
                       Configuration conf, BlockMover blockMover) {
+    this.currentResult = Result.NO_PLAN;
     this.blockMover = blockMover;
     this.dataset = this.blockMover.getDataset();
     this.dataNodeUUID = dataNodeUUID;
@@ -97,6 +101,7 @@ public class DiskBalancer {
     lock.lock();
     try {
       this.isDiskBalancerEnabled = false;
+      this.currentResult = Result.NO_PLAN;
       if ((this.future != null) && (!this.future.isDone())) {
         this.blockMover.setExitFlag();
         shutdownExecutor();
@@ -151,6 +156,7 @@ public class DiskBalancer {
           verifyPlan(planID, planVersion, plan, bandwidth, force);
       createWorkPlan(nodePlan);
       this.planID = planID;
+      this.currentResult = Result.PLAN_UNDER_PROGRESS;
       executePlan();
     } finally {
       lock.unlock();
@@ -158,6 +164,39 @@ public class DiskBalancer {
   }
 
   /**
+   * Returns the Current Work Status of a submitted Plan.
+   *
+   * @return DiskBalancerWorkStatus.
+   * @throws DiskBalancerException
+   */
+  public DiskBalancerWorkStatus queryWorkStatus() throws DiskBalancerException {
+    lock.lock();
+    try {
+      checkDiskBalancerEnabled();
+      // if we had a plan in progress, check if it is finished.
+      if (this.currentResult == Result.PLAN_UNDER_PROGRESS &&
+          this.future != null &&
+          this.future.isDone()) {
+        this.currentResult = Result.PLAN_DONE;
+      }
+
+      DiskBalancerWorkStatus status =
+          new DiskBalancerWorkStatus(this.currentResult, this.planID);
+      for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
+          workMap.entrySet()) {
+        DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
+            entry.getKey().getSource().getBasePath(),
+            entry.getKey().getDest().getBasePath(),
+            entry.getValue());
+        status.addWorkEntry(workEntry);
+      }
+      return status;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
    * Throws if Disk balancer is disabled.
    *
    * @throws DiskBalancerException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d1c39f9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index dc24787..974e973 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
@@ -36,6 +37,9 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
+import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
+
 public class TestDiskBalancerRPC {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
@@ -134,28 +138,20 @@ public class TestDiskBalancerRPC {
     Assert.assertEquals(cluster.getDataNodes().size(),
         diskBalancerCluster.getNodes().size());
     diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
-    DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
+    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+    DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
+        dataNode.getDatanodeUuid());
     GreedyPlanner planner = new GreedyPlanner(10.0f, node);
     NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
         ());
     planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
 
-    final int planVersion = 0; // So far we support only one version.
-    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
+    final int planVersion = 1; // So far we support only one version.
     String planHash = DigestUtils.sha512Hex(plan.toJson());
-
-    // Since submitDiskBalancerPlan is not implemented yet, it throws an
-    // Exception, this will be modified with the actual implementation.
-    try {
       dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
-    } catch (DiskBalancerException ex) {
-      // Let us ignore this for time being.
-    }
-
-    // TODO : This will be fixed when we have implementation for this
-    // function in server side.
-    thrown.expect(DiskBalancerException.class);
-    dataNode.queryDiskBalancerPlan();
+    DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
+    Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
+        status.getResult() == PLAN_DONE);
   }
 
   @Test