You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/06/19 02:15:26 UTC

[incubator-nemo] branch master updated: [NEMO-87] Remove unused BlockStates (#49)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 88b7933  [NEMO-87] Remove unused BlockStates (#49)
88b7933 is described below

commit 88b793372b528f6676261fddd055f3f8eab4e57a
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 19 11:15:24 2018 +0900

    [NEMO-87] Remove unused BlockStates (#49)
    
    JIRA: [NEMO-87: Remove unused BlockStates](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-87)
    
    **Major changes:**
    - READY, LOST_BEFORE_COMMIT, LOST, REMOVED => NOT_AVAILABLE
    - SCHEDULED => IN_PROGRESS
    - COMMITTED => AVAILABLE
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - N/A (no new feature)
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-87](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-87)
---
 .../common/exception/AbsentBlockException.java     |  2 +-
 .../snu/nemo/runtime/common/state/BlockState.java  | 38 +++++--------
 runtime/common/src/main/proto/ControlMessage.proto |  9 +--
 .../runtime/executor/data/BlockManagerWorker.java  |  4 +-
 .../nemo/runtime/executor/data/BlockStoreTest.java | 12 ++--
 .../nemo/runtime/master/BlockManagerMaster.java    | 64 ++++++++--------------
 .../edu/snu/nemo/runtime/master/BlockMetadata.java |  8 +--
 .../runtime/master/BlockManagerMasterTest.java     | 31 +++++------
 8 files changed, 66 insertions(+), 102 deletions(-)

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
index f647445..bc7e5e5 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.common.exception;
 import edu.snu.nemo.runtime.common.state.BlockState;
 
 /**
- * An exception which represents the requested block is neither COMMITTED nor SCHEDULED.
+ * An exception which represents the requested block is neither AVAILABLE nor IN_PROGRESS.
  */
 public final class AbsentBlockException extends Exception {
   private final String blockId;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index 2402948..65ada21 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -31,29 +31,22 @@ public final class BlockState {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.READY, "The block is ready to be created.");
-    stateMachineBuilder.addState(State.SCHEDULED, "The block is scheduled for creation.");
-    stateMachineBuilder.addState(State.COMMITTED, "The block has been committed.");
-    stateMachineBuilder.addState(State.LOST_BEFORE_COMMIT, "The task that produces the block is scheduled, "
-        + "but failed before committing");
-    stateMachineBuilder.addState(State.REMOVED, "The block has been removed (e.g., GC-ed).");
-    stateMachineBuilder.addState(State.LOST, "Block lost.");
+    stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
+    stateMachineBuilder.addState(State.IN_PROGRESS, "The block is in the progress of being created.");
+    stateMachineBuilder.addState(State.AVAILABLE, "The block is available.");
 
     // Add transitions
-    stateMachineBuilder.addTransition(State.READY, State.SCHEDULED,
+    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.IN_PROGRESS,
         "The task that produces the block is scheduled.");
-    stateMachineBuilder.addTransition(State.SCHEDULED, State.COMMITTED, "Successfully moved and committed");
-    stateMachineBuilder.addTransition(State.SCHEDULED, State.LOST_BEFORE_COMMIT, "The block is lost before commit");
-    stateMachineBuilder.addTransition(State.COMMITTED, State.LOST, "Lost after committed");
-    stateMachineBuilder.addTransition(State.COMMITTED, State.REMOVED, "Removed after committed");
-    stateMachineBuilder.addTransition(State.REMOVED, State.SCHEDULED,
-        "Re-scheduled after removal due to fault tolerance");
+    stateMachineBuilder.addTransition(State.IN_PROGRESS, State.AVAILABLE, "The block is successfully created");
 
-    stateMachineBuilder.addTransition(State.LOST, State.SCHEDULED, "The producer of the lost block is rescheduled");
-    stateMachineBuilder.addTransition(State.LOST_BEFORE_COMMIT, State.SCHEDULED,
-        "The producer of the lost block is rescheduled");
+    stateMachineBuilder.addTransition(State.IN_PROGRESS, State.NOT_AVAILABLE,
+        "The block is lost before being created");
+    stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is lost");
+    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.NOT_AVAILABLE,
+        "A block can be reported lost from multiple sources");
 
-    stateMachineBuilder.setInitialState(State.READY);
+    stateMachineBuilder.setInitialState(State.NOT_AVAILABLE);
 
     return stateMachineBuilder.build();
   }
@@ -66,12 +59,9 @@ public final class BlockState {
    * BlockState.
    */
   public enum State {
-    READY,
-    SCHEDULED,
-    COMMITTED,
-    LOST_BEFORE_COMMIT,
-    LOST,
-    REMOVED
+    NOT_AVAILABLE,
+    IN_PROGRESS,
+    AVAILABLE,
   }
 
   @Override
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index cc883dc..664734b 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -141,12 +141,9 @@ enum TaskStateFromExecutor {
 }
 
 enum BlockStateFromExecutor {
-    BLOCK_READY = 0;
-    SCHEDULED = 1;
-    COMMITTED = 2;
-    LOST = 3;
-    LOST_BEFORE_COMMIT = 4;
-    REMOVED = 5;
+    NOT_AVAILABLE = 0;
+    IN_PROGRESS = 1;
+    AVAILABLE = 2;
 }
 
 enum BlockStore {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 41446c1..86a18cb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -287,7 +287,7 @@ public final class BlockManagerWorker {
         ControlMessage.BlockStateChangedMsg.newBuilder()
             .setExecutorId(executorId)
             .setBlockId(blockId)
-            .setState(ControlMessage.BlockStateFromExecutor.COMMITTED);
+            .setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
 
     if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
       blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
@@ -345,7 +345,7 @@ public final class BlockManagerWorker {
           ControlMessage.BlockStateChangedMsg.newBuilder()
               .setExecutorId(executorId)
               .setBlockId(blockId)
-              .setState(ControlMessage.BlockStateFromExecutor.REMOVED);
+              .setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
 
       if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
         blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 48ee45e..74c2504 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -129,7 +129,7 @@ public final class BlockStoreTest {
       blockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
-          blockId, BlockState.State.SCHEDULED, null);
+          blockId, BlockState.State.IN_PROGRESS, null);
 
       // Create blocks for this block.
       final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_VERTICES);
@@ -150,7 +150,7 @@ public final class BlockStoreTest {
     concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
     blockManagerMaster.initializeState(concBlockId, "unused");
     blockManagerMaster.onBlockStateChanged(
-        concBlockId, BlockState.State.SCHEDULED, null);
+        concBlockId, BlockState.State.IN_PROGRESS, null);
     IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> concReadTaskIdList.add("conc_read_IR_vertex"));
     concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, CONC_READ_DATA_SIZE), -1, -1);
 
@@ -175,7 +175,7 @@ public final class BlockStoreTest {
       hashedBlockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
-          blockId, BlockState.State.SCHEDULED, null);
+          blockId, BlockState.State.IN_PROGRESS, null);
       final List<NonSerializedPartition<Integer>> hashedBlock = new ArrayList<>(HASH_RANGE);
       // Generates the data having each hash value.
       IntStream.range(0, HASH_RANGE).forEach(hashValue ->
@@ -319,7 +319,7 @@ public final class BlockStoreTest {
               }
               block.commit();
               writerSideStore.writeBlock(block);
-              blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED,
+              blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE,
                   "Writer side of the shuffle edge");
               return true;
             } catch (final Exception e) {
@@ -413,7 +413,7 @@ public final class BlockStoreTest {
           block.commit();
           writerSideStore.writeBlock(block);
           blockManagerMaster.onBlockStateChanged(
-              concBlockId, BlockState.State.COMMITTED, "Writer side of the concurrent read edge");
+              concBlockId, BlockState.State.AVAILABLE, "Writer side of the concurrent read edge");
           return true;
         } catch (final Exception e) {
           e.printStackTrace();
@@ -501,7 +501,7 @@ public final class BlockStoreTest {
               }
               block.commit();
               writerSideStore.writeBlock(block);
-              blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED,
+              blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE,
                   "Writer side of the shuffle in hash range edge");
               return true;
             } catch (final Exception e) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 7b684a3..9c24671 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -42,7 +42,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static edu.snu.nemo.runtime.common.state.BlockState.State.SCHEDULED;
+import static edu.snu.nemo.runtime.common.state.BlockState.State.IN_PROGRESS;
 
 /**
  * Master-side block manager.
@@ -109,7 +109,7 @@ public final class BlockManagerMaster {
     try {
       // Set committed block states to lost
       getCommittedBlocksByWorker(executorId).forEach(blockId -> {
-        onBlockStateChanged(blockId, BlockState.State.LOST, executorId);
+        onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, executorId);
         // producerTaskForPartition should always be non-empty.
         final Set<String> producerTaskForPartition = getProducerTaskIds(blockId);
         producerTaskForPartition.forEach(tasksToRecompute::add);
@@ -126,7 +126,7 @@ public final class BlockManagerMaster {
    *
    * @param blockId id of the specified block.
    * @return the handler of block location requests, which completes exceptionally when the block
-   * is not {@code SCHEDULED} or {@code COMMITTED}.
+   * is not {@code IN_PROGRESS} or {@code AVAILABLE}.
    */
   public BlockLocationRequestHandler getBlockLocationHandler(final String blockId) {
     final Lock readLock = lock.readLock();
@@ -135,13 +135,10 @@ public final class BlockManagerMaster {
       final BlockState.State state =
           (BlockState.State) getBlockState(blockId).getStateMachine().getCurrentState();
       switch (state) {
-        case SCHEDULED:
-        case COMMITTED:
+        case IN_PROGRESS:
+        case AVAILABLE:
           return blockIdToMetadata.get(blockId).getLocationHandler();
-        case READY:
-        case LOST_BEFORE_COMMIT:
-        case LOST:
-        case REMOVED:
+        case NOT_AVAILABLE:
           final BlockLocationRequestHandler handler = new BlockLocationRequestHandler(blockId);
           handler.completeExceptionally(new AbsentBlockException(blockId, state));
           return handler;
@@ -191,8 +188,8 @@ public final class BlockManagerMaster {
       if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
         producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
           if (!blockIdToMetadata.get(blockId).getBlockState()
-              .getStateMachine().getCurrentState().equals(SCHEDULED)) {
-            onBlockStateChanged(blockId, SCHEDULED, null);
+              .getStateMachine().getCurrentState().equals(IN_PROGRESS)) {
+            onBlockStateChanged(blockId, IN_PROGRESS, null);
           }
         });
       } // else this task does not produce any block
@@ -216,13 +213,8 @@ public final class BlockManagerMaster {
         producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
           final BlockState.State state = (BlockState.State)
               blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
-          if (state == BlockState.State.COMMITTED) {
-            LOG.info("Partition lost: {}", blockId);
-            onBlockStateChanged(blockId, BlockState.State.LOST, null);
-          } else {
-            LOG.info("Partition lost_before_commit: {}", blockId);
-            onBlockStateChanged(blockId, BlockState.State.LOST_BEFORE_COMMIT, null);
-          }
+          LOG.info("Partition lost: {}", blockId);
+          onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, null);
         });
       } // else this task does not produce any block
     } finally {
@@ -444,18 +436,12 @@ public final class BlockManagerMaster {
    */
   public static BlockState.State convertBlockState(final ControlMessage.BlockStateFromExecutor state) {
     switch (state) {
-      case BLOCK_READY:
-        return BlockState.State.READY;
-      case SCHEDULED:
-        return BlockState.State.SCHEDULED;
-      case COMMITTED:
-        return BlockState.State.COMMITTED;
-      case LOST_BEFORE_COMMIT:
-        return BlockState.State.LOST_BEFORE_COMMIT;
-      case LOST:
-        return BlockState.State.LOST;
-      case REMOVED:
-        return BlockState.State.REMOVED;
+      case NOT_AVAILABLE:
+        return BlockState.State.NOT_AVAILABLE;
+      case IN_PROGRESS:
+        return BlockState.State.IN_PROGRESS;
+      case AVAILABLE:
+        return BlockState.State.AVAILABLE;
       default:
         throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
     }
@@ -468,18 +454,12 @@ public final class BlockManagerMaster {
    */
   public static ControlMessage.BlockStateFromExecutor convertBlockState(final BlockState.State state) {
     switch (state) {
-      case READY:
-        return ControlMessage.BlockStateFromExecutor.BLOCK_READY;
-      case SCHEDULED:
-        return ControlMessage.BlockStateFromExecutor.SCHEDULED;
-      case COMMITTED:
-        return ControlMessage.BlockStateFromExecutor.COMMITTED;
-      case LOST_BEFORE_COMMIT:
-        return ControlMessage.BlockStateFromExecutor.LOST_BEFORE_COMMIT;
-      case LOST:
-        return ControlMessage.BlockStateFromExecutor.LOST;
-      case REMOVED:
-        return ControlMessage.BlockStateFromExecutor.REMOVED;
+      case NOT_AVAILABLE:
+        return ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE;
+      case IN_PROGRESS:
+        return ControlMessage.BlockStateFromExecutor.IN_PROGRESS;
+      case AVAILABLE:
+        return ControlMessage.BlockStateFromExecutor.AVAILABLE;
       default:
         throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index c2fc732..c5957db 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -60,19 +60,17 @@ final class BlockMetadata {
     LOG.debug("Block State Transition: id {} from {} to {}", new Object[]{blockId, oldState, newState});
 
     switch (newState) {
-      case SCHEDULED:
+      case IN_PROGRESS:
         stateMachine.setState(newState);
         break;
-      case LOST:
+      case NOT_AVAILABLE:
         LOG.info("Block {} lost in {}", new Object[]{blockId, location});
-      case LOST_BEFORE_COMMIT:
-      case REMOVED:
         // Reset the block location and committer information.
         locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
         locationHandler = new BlockManagerMaster.BlockLocationRequestHandler(blockId);
         stateMachine.setState(newState);
         break;
-      case COMMITTED:
+      case AVAILABLE:
         assert (location != null);
         locationHandler.complete(location);
         stateMachine.setState(newState);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 4a0733d..a465374 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -21,7 +21,6 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.state.BlockState;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
@@ -88,25 +87,25 @@ public final class BlockManagerMasterTest {
     final String executorId = RuntimeIdGenerator.generateExecutorId();
     final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, srcTaskIndex);
 
-    // Initially the block state is READY.
+    // Initially the block state is NOT_AVAILABLE.
     blockManagerMaster.initializeState(blockId, taskId);
     checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.READY);
+        BlockState.State.NOT_AVAILABLE);
 
-    // The block is being SCHEDULED.
+    // The block is being IN_PROGRESS.
     blockManagerMaster.onProducerTaskScheduled(taskId);
     final Future<String> future = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
     checkPendingFuture(future);
 
-    // The block is COMMITTED
-    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED, executorId);
-    checkBlockLocation(future, executorId); // A future, previously pending on SCHEDULED state, is now resolved.
+    // The block is AVAILABLE
+    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
+    checkBlockLocation(future, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
     checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
 
-    // We LOST the block.
+    // We lost the block.
     blockManagerMaster.removeWorker(executorId);
     checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.LOST);
+        BlockState.State.NOT_AVAILABLE);
   }
 
   /**
@@ -130,10 +129,10 @@ public final class BlockManagerMasterTest {
     // Producer task fails.
     blockManagerMaster.onProducerTaskFailed(taskId);
 
-    // A future, previously pending on SCHEDULED state, is now completed exceptionally.
-    checkBlockAbsentException(future0, blockId, BlockState.State.LOST_BEFORE_COMMIT);
+    // A future, previously pending on IN_PROGRESS state, is now completed exceptionally.
+    checkBlockAbsentException(future0, blockId, BlockState.State.NOT_AVAILABLE);
     checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.LOST_BEFORE_COMMIT);
+        BlockState.State.NOT_AVAILABLE);
 
     // Re-scheduling the task.
     blockManagerMaster.onProducerTaskScheduled(taskId);
@@ -141,13 +140,13 @@ public final class BlockManagerMasterTest {
     checkPendingFuture(future1);
 
     // Committed.
-    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED, executorId);
-    checkBlockLocation(future1, executorId); // A future, previously pending on SCHEDULED state, is now resolved.
+    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
+    checkBlockLocation(future1, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
     checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
 
     // Then removed.
-    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.REMOVED, executorId);
+    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, executorId);
     checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
-        BlockState.State.REMOVED);
+        BlockState.State.NOT_AVAILABLE);
   }
 }