You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/06/19 02:15:26 UTC
[incubator-nemo] branch master updated: [NEMO-87] Remove unused
BlockStates (#49)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 88b7933 [NEMO-87] Remove unused BlockStates (#49)
88b7933 is described below
commit 88b793372b528f6676261fddd055f3f8eab4e57a
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 19 11:15:24 2018 +0900
[NEMO-87] Remove unused BlockStates (#49)
JIRA: [NEMO-87: Remove unused BlockStates](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-87)
**Major changes:**
- READY, LOST_BEFORE_COMMIT, LOST, REMOVED => NOT_AVAILABLE
- SCHEDULED => IN_PROGRESS
- COMMITTED => AVAILABLE
**Minor changes to note:**
- N/A
**Tests for the changes:**
- N/A (no new feature)
**Other comments:**
- N/A
resolves [NEMO-87](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-87)
---
.../common/exception/AbsentBlockException.java | 2 +-
.../snu/nemo/runtime/common/state/BlockState.java | 38 +++++--------
runtime/common/src/main/proto/ControlMessage.proto | 9 +--
.../runtime/executor/data/BlockManagerWorker.java | 4 +-
.../nemo/runtime/executor/data/BlockStoreTest.java | 12 ++--
.../nemo/runtime/master/BlockManagerMaster.java | 64 ++++++++--------------
.../edu/snu/nemo/runtime/master/BlockMetadata.java | 8 +--
.../runtime/master/BlockManagerMasterTest.java | 31 +++++------
8 files changed, 66 insertions(+), 102 deletions(-)
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
index f647445..bc7e5e5 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.common.exception;
import edu.snu.nemo.runtime.common.state.BlockState;
/**
- * An exception which represents the requested block is neither COMMITTED nor SCHEDULED.
+ * An exception which represents the requested block is neither AVAILABLE nor IN_PROGRESS.
*/
public final class AbsentBlockException extends Exception {
private final String blockId;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index 2402948..65ada21 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -31,29 +31,22 @@ public final class BlockState {
final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
// Add states
- stateMachineBuilder.addState(State.READY, "The block is ready to be created.");
- stateMachineBuilder.addState(State.SCHEDULED, "The block is scheduled for creation.");
- stateMachineBuilder.addState(State.COMMITTED, "The block has been committed.");
- stateMachineBuilder.addState(State.LOST_BEFORE_COMMIT, "The task that produces the block is scheduled, "
- + "but failed before committing");
- stateMachineBuilder.addState(State.REMOVED, "The block has been removed (e.g., GC-ed).");
- stateMachineBuilder.addState(State.LOST, "Block lost.");
+ stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not available.");
+ stateMachineBuilder.addState(State.IN_PROGRESS, "The block is in the progress of being created.");
+ stateMachineBuilder.addState(State.AVAILABLE, "The block is available.");
// Add transitions
- stateMachineBuilder.addTransition(State.READY, State.SCHEDULED,
+ stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.IN_PROGRESS,
"The task that produces the block is scheduled.");
- stateMachineBuilder.addTransition(State.SCHEDULED, State.COMMITTED, "Successfully moved and committed");
- stateMachineBuilder.addTransition(State.SCHEDULED, State.LOST_BEFORE_COMMIT, "The block is lost before commit");
- stateMachineBuilder.addTransition(State.COMMITTED, State.LOST, "Lost after committed");
- stateMachineBuilder.addTransition(State.COMMITTED, State.REMOVED, "Removed after committed");
- stateMachineBuilder.addTransition(State.REMOVED, State.SCHEDULED,
- "Re-scheduled after removal due to fault tolerance");
+ stateMachineBuilder.addTransition(State.IN_PROGRESS, State.AVAILABLE, "The block is successfully created");
- stateMachineBuilder.addTransition(State.LOST, State.SCHEDULED, "The producer of the lost block is rescheduled");
- stateMachineBuilder.addTransition(State.LOST_BEFORE_COMMIT, State.SCHEDULED,
- "The producer of the lost block is rescheduled");
+ stateMachineBuilder.addTransition(State.IN_PROGRESS, State.NOT_AVAILABLE,
+ "The block is lost before being created");
+ stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, "The block is lost");
+ stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.NOT_AVAILABLE,
+ "A block can be reported lost from multiple sources");
- stateMachineBuilder.setInitialState(State.READY);
+ stateMachineBuilder.setInitialState(State.NOT_AVAILABLE);
return stateMachineBuilder.build();
}
@@ -66,12 +59,9 @@ public final class BlockState {
* BlockState.
*/
public enum State {
- READY,
- SCHEDULED,
- COMMITTED,
- LOST_BEFORE_COMMIT,
- LOST,
- REMOVED
+ NOT_AVAILABLE,
+ IN_PROGRESS,
+ AVAILABLE,
}
@Override
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index cc883dc..664734b 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -141,12 +141,9 @@ enum TaskStateFromExecutor {
}
enum BlockStateFromExecutor {
- BLOCK_READY = 0;
- SCHEDULED = 1;
- COMMITTED = 2;
- LOST = 3;
- LOST_BEFORE_COMMIT = 4;
- REMOVED = 5;
+ NOT_AVAILABLE = 0;
+ IN_PROGRESS = 1;
+ AVAILABLE = 2;
}
enum BlockStore {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 41446c1..86a18cb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -287,7 +287,7 @@ public final class BlockManagerWorker {
ControlMessage.BlockStateChangedMsg.newBuilder()
.setExecutorId(executorId)
.setBlockId(blockId)
- .setState(ControlMessage.BlockStateFromExecutor.COMMITTED);
+ .setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
@@ -345,7 +345,7 @@ public final class BlockManagerWorker {
ControlMessage.BlockStateChangedMsg.newBuilder()
.setExecutorId(executorId)
.setBlockId(blockId)
- .setState(ControlMessage.BlockStateFromExecutor.REMOVED);
+ .setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 48ee45e..74c2504 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -129,7 +129,7 @@ public final class BlockStoreTest {
blockIdList.add(blockId);
blockManagerMaster.initializeState(blockId, "Unused");
blockManagerMaster.onBlockStateChanged(
- blockId, BlockState.State.SCHEDULED, null);
+ blockId, BlockState.State.IN_PROGRESS, null);
// Create blocks for this block.
final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_VERTICES);
@@ -150,7 +150,7 @@ public final class BlockStoreTest {
concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
blockManagerMaster.initializeState(concBlockId, "unused");
blockManagerMaster.onBlockStateChanged(
- concBlockId, BlockState.State.SCHEDULED, null);
+ concBlockId, BlockState.State.IN_PROGRESS, null);
IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> concReadTaskIdList.add("conc_read_IR_vertex"));
concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, CONC_READ_DATA_SIZE), -1, -1);
@@ -175,7 +175,7 @@ public final class BlockStoreTest {
hashedBlockIdList.add(blockId);
blockManagerMaster.initializeState(blockId, "Unused");
blockManagerMaster.onBlockStateChanged(
- blockId, BlockState.State.SCHEDULED, null);
+ blockId, BlockState.State.IN_PROGRESS, null);
final List<NonSerializedPartition<Integer>> hashedBlock = new ArrayList<>(HASH_RANGE);
// Generates the data having each hash value.
IntStream.range(0, HASH_RANGE).forEach(hashValue ->
@@ -319,7 +319,7 @@ public final class BlockStoreTest {
}
block.commit();
writerSideStore.writeBlock(block);
- blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED,
+ blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE,
"Writer side of the shuffle edge");
return true;
} catch (final Exception e) {
@@ -413,7 +413,7 @@ public final class BlockStoreTest {
block.commit();
writerSideStore.writeBlock(block);
blockManagerMaster.onBlockStateChanged(
- concBlockId, BlockState.State.COMMITTED, "Writer side of the concurrent read edge");
+ concBlockId, BlockState.State.AVAILABLE, "Writer side of the concurrent read edge");
return true;
} catch (final Exception e) {
e.printStackTrace();
@@ -501,7 +501,7 @@ public final class BlockStoreTest {
}
block.commit();
writerSideStore.writeBlock(block);
- blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED,
+ blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE,
"Writer side of the shuffle in hash range edge");
return true;
} catch (final Exception e) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 7b684a3..9c24671 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -42,7 +42,7 @@ import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static edu.snu.nemo.runtime.common.state.BlockState.State.SCHEDULED;
+import static edu.snu.nemo.runtime.common.state.BlockState.State.IN_PROGRESS;
/**
* Master-side block manager.
@@ -109,7 +109,7 @@ public final class BlockManagerMaster {
try {
// Set committed block states to lost
getCommittedBlocksByWorker(executorId).forEach(blockId -> {
- onBlockStateChanged(blockId, BlockState.State.LOST, executorId);
+ onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, executorId);
// producerTaskForPartition should always be non-empty.
final Set<String> producerTaskForPartition = getProducerTaskIds(blockId);
producerTaskForPartition.forEach(tasksToRecompute::add);
@@ -126,7 +126,7 @@ public final class BlockManagerMaster {
*
* @param blockId id of the specified block.
* @return the handler of block location requests, which completes exceptionally when the block
- * is not {@code SCHEDULED} or {@code COMMITTED}.
+ * is not {@code IN_PROGRESS} or {@code AVAILABLE}.
*/
public BlockLocationRequestHandler getBlockLocationHandler(final String blockId) {
final Lock readLock = lock.readLock();
@@ -135,13 +135,10 @@ public final class BlockManagerMaster {
final BlockState.State state =
(BlockState.State) getBlockState(blockId).getStateMachine().getCurrentState();
switch (state) {
- case SCHEDULED:
- case COMMITTED:
+ case IN_PROGRESS:
+ case AVAILABLE:
return blockIdToMetadata.get(blockId).getLocationHandler();
- case READY:
- case LOST_BEFORE_COMMIT:
- case LOST:
- case REMOVED:
+ case NOT_AVAILABLE:
final BlockLocationRequestHandler handler = new BlockLocationRequestHandler(blockId);
handler.completeExceptionally(new AbsentBlockException(blockId, state));
return handler;
@@ -191,8 +188,8 @@ public final class BlockManagerMaster {
if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
if (!blockIdToMetadata.get(blockId).getBlockState()
- .getStateMachine().getCurrentState().equals(SCHEDULED)) {
- onBlockStateChanged(blockId, SCHEDULED, null);
+ .getStateMachine().getCurrentState().equals(IN_PROGRESS)) {
+ onBlockStateChanged(blockId, IN_PROGRESS, null);
}
});
} // else this task does not produce any block
@@ -216,13 +213,8 @@ public final class BlockManagerMaster {
producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
final BlockState.State state = (BlockState.State)
blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
- if (state == BlockState.State.COMMITTED) {
- LOG.info("Partition lost: {}", blockId);
- onBlockStateChanged(blockId, BlockState.State.LOST, null);
- } else {
- LOG.info("Partition lost_before_commit: {}", blockId);
- onBlockStateChanged(blockId, BlockState.State.LOST_BEFORE_COMMIT, null);
- }
+ LOG.info("Partition lost: {}", blockId);
+ onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, null);
});
} // else this task does not produce any block
} finally {
@@ -444,18 +436,12 @@ public final class BlockManagerMaster {
*/
public static BlockState.State convertBlockState(final ControlMessage.BlockStateFromExecutor state) {
switch (state) {
- case BLOCK_READY:
- return BlockState.State.READY;
- case SCHEDULED:
- return BlockState.State.SCHEDULED;
- case COMMITTED:
- return BlockState.State.COMMITTED;
- case LOST_BEFORE_COMMIT:
- return BlockState.State.LOST_BEFORE_COMMIT;
- case LOST:
- return BlockState.State.LOST;
- case REMOVED:
- return BlockState.State.REMOVED;
+ case NOT_AVAILABLE:
+ return BlockState.State.NOT_AVAILABLE;
+ case IN_PROGRESS:
+ return BlockState.State.IN_PROGRESS;
+ case AVAILABLE:
+ return BlockState.State.AVAILABLE;
default:
throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
}
@@ -468,18 +454,12 @@ public final class BlockManagerMaster {
*/
public static ControlMessage.BlockStateFromExecutor convertBlockState(final BlockState.State state) {
switch (state) {
- case READY:
- return ControlMessage.BlockStateFromExecutor.BLOCK_READY;
- case SCHEDULED:
- return ControlMessage.BlockStateFromExecutor.SCHEDULED;
- case COMMITTED:
- return ControlMessage.BlockStateFromExecutor.COMMITTED;
- case LOST_BEFORE_COMMIT:
- return ControlMessage.BlockStateFromExecutor.LOST_BEFORE_COMMIT;
- case LOST:
- return ControlMessage.BlockStateFromExecutor.LOST;
- case REMOVED:
- return ControlMessage.BlockStateFromExecutor.REMOVED;
+ case NOT_AVAILABLE:
+ return ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE;
+ case IN_PROGRESS:
+ return ControlMessage.BlockStateFromExecutor.IN_PROGRESS;
+ case AVAILABLE:
+ return ControlMessage.BlockStateFromExecutor.AVAILABLE;
default:
throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index c2fc732..c5957db 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -60,19 +60,17 @@ final class BlockMetadata {
LOG.debug("Block State Transition: id {} from {} to {}", new Object[]{blockId, oldState, newState});
switch (newState) {
- case SCHEDULED:
+ case IN_PROGRESS:
stateMachine.setState(newState);
break;
- case LOST:
+ case NOT_AVAILABLE:
LOG.info("Block {} lost in {}", new Object[]{blockId, location});
- case LOST_BEFORE_COMMIT:
- case REMOVED:
// Reset the block location and committer information.
locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
locationHandler = new BlockManagerMaster.BlockLocationRequestHandler(blockId);
stateMachine.setState(newState);
break;
- case COMMITTED:
+ case AVAILABLE:
assert (location != null);
locationHandler.complete(location);
stateMachine.setState(newState);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 4a0733d..a465374 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -21,7 +21,6 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.state.BlockState;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.junit.Before;
@@ -88,25 +87,25 @@ public final class BlockManagerMasterTest {
final String executorId = RuntimeIdGenerator.generateExecutorId();
final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, srcTaskIndex);
- // Initially the block state is READY.
+ // Initially the block state is NOT_AVAILABLE.
blockManagerMaster.initializeState(blockId, taskId);
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
- BlockState.State.READY);
+ BlockState.State.NOT_AVAILABLE);
- // The block is being SCHEDULED.
+ // The block is being IN_PROGRESS.
blockManagerMaster.onProducerTaskScheduled(taskId);
final Future<String> future = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
checkPendingFuture(future);
- // The block is COMMITTED
- blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED, executorId);
- checkBlockLocation(future, executorId); // A future, previously pending on SCHEDULED state, is now resolved.
+ // The block is AVAILABLE
+ blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
+ checkBlockLocation(future, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
- // We LOST the block.
+ // We lost the block.
blockManagerMaster.removeWorker(executorId);
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
- BlockState.State.LOST);
+ BlockState.State.NOT_AVAILABLE);
}
/**
@@ -130,10 +129,10 @@ public final class BlockManagerMasterTest {
// Producer task fails.
blockManagerMaster.onProducerTaskFailed(taskId);
- // A future, previously pending on SCHEDULED state, is now completed exceptionally.
- checkBlockAbsentException(future0, blockId, BlockState.State.LOST_BEFORE_COMMIT);
+ // A future, previously pending on IN_PROGRESS state, is now completed exceptionally.
+ checkBlockAbsentException(future0, blockId, BlockState.State.NOT_AVAILABLE);
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
- BlockState.State.LOST_BEFORE_COMMIT);
+ BlockState.State.NOT_AVAILABLE);
// Re-scheduling the task.
blockManagerMaster.onProducerTaskScheduled(taskId);
@@ -141,13 +140,13 @@ public final class BlockManagerMasterTest {
checkPendingFuture(future1);
// Committed.
- blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.COMMITTED, executorId);
- checkBlockLocation(future1, executorId); // A future, previously pending on SCHEDULED state, is now resolved.
+ blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE, executorId);
+ checkBlockLocation(future1, executorId); // A future, previously pending on IN_PROGRESS state, is now resolved.
checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), executorId);
// Then removed.
- blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.REMOVED, executorId);
+ blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, executorId);
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
- BlockState.State.REMOVED);
+ BlockState.State.NOT_AVAILABLE);
}
}