You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/03 07:46:36 UTC
[incubator-nemo] branch master updated: [NEMO-125] Fix data loss
bug caused by SailfishSchedulingPolicy (#68)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new c3bd780 [NEMO-125] Fix data loss bug caused by SailfishSchedulingPolicy (#68)
c3bd780 is described below
commit c3bd780af7062b380cde4a291e8f978f3da90ad9
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Tue Jul 3 16:46:34 2018 +0900
[NEMO-125] Fix data loss bug caused by SailfishSchedulingPolicy (#68)
JIRA: [NEMO-125: Fix data loss bug caused by SailfishSchedulingPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-125)
**Major changes:**
- Let `InputStreamIterator` in `DataUtil` use the size of each partition instead of the number of elements to limit the data to be read.
- This "limiting" behavior is needed to avoid to read padding bytes introduced by compression or other `Stream` implementations (such as `ByteArrayOutputStream`).
- Remove the number of elements in a partition from partition metadata.
- Re-enable byte array coder optimization in `SailfishPass`.
**Minor changes to note:**
- Make `DataTransferFactory` be constructed through Tang only (instead of public constructor).
- Make `OutputWriter` be constructed through `DataTransferFactory` only.
**Tests for the changes:**
- Existing data plane unit tests cover this change.
- Add `testSailfishInOneExecutor` in `WordCountITCase`.
- This test reproduces the same resource environment and fails without the change in this PR.
**Other comments:**
- N/A.
resolves [NEMO-125](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-125)
---
.../pass/compiletime/composite/SailfishPass.java | 5 +--
.../snu/nemo/examples/beam/WordCountITCase.java | 14 ++++++-
.../beam_sample_one_executor_resources.json | 7 ++++
.../snu/nemo/runtime/executor/data/DataUtil.java | 49 ++++++++++------------
.../runtime/executor/data/block/FileBlock.java | 7 ++--
.../executor/data/metadata/FileMetadata.java | 6 +--
.../executor/data/metadata/PartitionMetadata.java | 15 +------
.../executor/data/metadata/RemoteFileMetadata.java | 2 -
.../data/partition/SerializedPartition.java | 18 --------
.../executor/datatransfer/DataTransferFactory.java | 6 +--
.../executor/datatransfer/OutputWriter.java | 12 +++---
.../executor/datatransfer/DataTransferTest.java | 47 ++++++++++++---------
.../compiletime/composite/SailfishPassTest.java | 10 ++---
13 files changed, 90 insertions(+), 108 deletions(-)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index ab1ece1..d5c50e4 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -32,9 +32,8 @@ public final class SailfishPass extends CompositePass {
new SailfishRelayReshapingPass(),
new SailfishEdgeDataFlowModelPass(),
new SailfishEdgeDataStorePass(),
- // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
- // new SailfishEdgeDecoderPass(),
- // new SailfishEdgeEncoderPass(),
+ new SailfishEdgeDecoderPass(),
+ new SailfishEdgeEncoderPass(),
new SailfishEdgeUsedDataHandlingPass()
));
}
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 507d365..0ffba2e 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -40,13 +40,13 @@ public final class WordCountITCase {
private static final String outputFileName = "sample_output_wordcount";
private static final String testResourceFileName = "test_output_wordcount";
private static final String executorResourceFileName = fileBasePath + "beam_sample_executor_resources.json";
+ private static final String oneExecutorResourceFileName = fileBasePath + "beam_sample_one_executor_resources.json";
private static final String inputFilePath = fileBasePath + inputFileName;
private static final String outputFilePath = fileBasePath + outputFileName;
@Before
public void setUp() throws Exception {
builder = new ArgBuilder()
- .addResourceJson(executorResourceFileName)
.addUserMain(WordCount.class.getCanonicalName())
.addUserArgs(inputFilePath, outputFilePath);
}
@@ -63,6 +63,7 @@ public final class WordCountITCase {
@Test (timeout = TIMEOUT)
public void test() throws Exception {
JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName())
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
@@ -71,14 +72,25 @@ public final class WordCountITCase {
@Test (timeout = TIMEOUT)
public void testSailfish() throws Exception {
JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName() + "_sailfish")
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
.build());
}
@Test (timeout = TIMEOUT)
+ public void testSailfishInOneExecutor() throws Exception {
+ JobLauncher.main(builder
+ .addResourceJson(oneExecutorResourceFileName)
+ .addJobId(WordCountITCase.class.getSimpleName() + "_sailfishInOneExecutor")
+ .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
public void testPado() throws Exception {
JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName() + "_pado")
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
.build());
diff --git a/examples/resources/beam_sample_one_executor_resources.json b/examples/resources/beam_sample_one_executor_resources.json
new file mode 100644
index 0000000..069ed97
--- /dev/null
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -0,0 +1,7 @@
+[
+ {
+ "type": "Transient",
+ "memory_mb": 512,
+ "capacity": 5
+ }
+]
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 69eb9ba..50c43d9 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -49,44 +49,39 @@ public final class DataUtil {
/**
* Serializes the elements in a non-serialized partition into an output stream.
*
- * @param encoderFactory the encoderFactory to encode the elements.
+ * @param encoderFactory the encoderFactory to encode the elements.
* @param nonSerializedPartition the non-serialized partition to serialize.
* @param bytesOutputStream the output stream to write.
- * @return total number of elements in the partition.
* @throws IOException if fail to serialize.
*/
- public static long serializePartition(final EncoderFactory encoderFactory,
- final NonSerializedPartition nonSerializedPartition,
- final OutputStream bytesOutputStream) throws IOException {
- long elementsCount = 0;
+ private static void serializePartition(final EncoderFactory encoderFactory,
+ final NonSerializedPartition nonSerializedPartition,
+ final OutputStream bytesOutputStream) throws IOException {
final EncoderFactory.Encoder encoder = encoderFactory.create(bytesOutputStream);
for (final Object element : nonSerializedPartition.getData()) {
encoder.encode(element);
- elementsCount++;
}
-
- return elementsCount;
}
/**
* Reads the data of a partition from an input stream and deserializes it.
*
- * @param elementsInPartition the number of elements in this partition.
- * @param serializer the serializer to decode the bytes.
- * @param key the key value of the result partition.
- * @param inputStream the input stream which will return the data in the partition as bytes.
- * @param <K> the key type of the partitions.
+ * @param partitionSize the size of the partition to deserialize.
+ * @param serializer the serializer to decode the bytes.
+ * @param key the key value of the result partition.
+ * @param inputStream the input stream which will return the data in the partition as bytes.
+ * @param <K> the key type of the partitions.
* @return the list of deserialized elements.
* @throws IOException if fail to deserialize.
*/
- public static <K extends Serializable> NonSerializedPartition deserializePartition(final long elementsInPartition,
+ public static <K extends Serializable> NonSerializedPartition deserializePartition(final int partitionSize,
final Serializer serializer,
final K key,
final InputStream inputStream)
throws IOException {
final List deserializedData = new ArrayList();
final InputStreamIterator iterator = new InputStreamIterator(Collections.singletonList(inputStream).iterator(),
- serializer, elementsInPartition);
+ serializer, partitionSize);
iterator.forEachRemaining(deserializedData::add);
return new NonSerializedPartition(key, deserializedData, iterator.getNumSerializedBytes(),
iterator.getNumEncodedBytes());
@@ -111,15 +106,14 @@ public final class DataUtil {
final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
final OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
) {
- final long elementsTotal =
- serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
+ serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
// We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
// inner buffer directly, which can be an unfinished(not flushed) buffer.
wrappedStream.close();
final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
final int actualLength = bytesOutputStream.getCount();
serializedPartitions.add(
- new SerializedPartition<>(partitionToConvert.getKey(), elementsTotal, serializedBytes, actualLength));
+ new SerializedPartition<>(partitionToConvert.getKey(), serializedBytes, actualLength));
}
}
return serializedPartitions;
@@ -144,7 +138,7 @@ public final class DataUtil {
try (final ByteArrayInputStream byteArrayInputStream =
new ByteArrayInputStream(partitionToConvert.getData())) {
final NonSerializedPartition<K> deserializePartition = deserializePartition(
- partitionToConvert.getElementsCount(), serializer, key, byteArrayInputStream);
+ partitionToConvert.getLength(), serializer, key, byteArrayInputStream);
nonSerializedPartitions.add(deserializePartition);
}
}
@@ -211,7 +205,6 @@ public final class DataUtil {
private volatile T next;
private volatile boolean cannotContinueDecoding = false;
private volatile DecoderFactory.Decoder<T> decoder = null;
- private volatile long elementsDecoded = 0;
private volatile long numSerializedBytes = 0;
private volatile long numEncodedBytes = 0;
@@ -221,8 +214,8 @@ public final class DataUtil {
* @param inputStreams The streams to read data from.
* @param serializer The serializer.
*/
- public InputStreamIterator(final Iterator<InputStream> inputStreams,
- final Serializer<?, T> serializer) {
+ InputStreamIterator(final Iterator<InputStream> inputStreams,
+ final Serializer<?, T> serializer) {
this.inputStreams = inputStreams;
this.serializer = serializer;
// -1 means no limit.
@@ -234,12 +227,12 @@ public final class DataUtil {
*
* @param inputStreams The streams to read data from.
* @param serializer The serializer.
- * @param limit The number of elements from the {@link InputStream}.
+ * @param limit The bytes to read from the {@link InputStream}.
*/
- public InputStreamIterator(
+ private InputStreamIterator(
final Iterator<InputStream> inputStreams,
final Serializer<?, T> serializer,
- final long limit) {
+ final int limit) {
if (limit < 0) {
throw new IllegalArgumentException("Negative limit not allowed.");
}
@@ -256,7 +249,8 @@ public final class DataUtil {
if (cannotContinueDecoding) {
return false;
}
- if (limit != -1 && limit == elementsDecoded) {
+ if (limit != -1 && limit == (serializedCountingStream == null
+ ? numSerializedBytes : numSerializedBytes + serializedCountingStream.getCount())) {
cannotContinueDecoding = true;
return false;
}
@@ -280,7 +274,6 @@ public final class DataUtil {
try {
next = decoder.decode();
hasNext = true;
- elementsDecoded++;
return true;
} catch (final IOException e) {
// IOException from decoder indicates EOF event.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 9a64d86..41f125b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -78,8 +78,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
try (final FileOutputStream fileOutputStream = new FileOutputStream(filePath, true)) {
for (final SerializedPartition<K> serializedPartition : serializedPartitions) {
// Reserve a partition write and get the metadata.
- metadata.writePartitionMetadata(
- serializedPartition.getKey(), serializedPartition.getLength(), serializedPartition.getElementsCount());
+ metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength());
fileOutputStream.write(serializedPartition.getData(), 0, serializedPartition.getLength());
}
}
@@ -187,7 +186,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
new LimitedInputStream(fileStream, partitionMetadata.getPartitionSize());
final NonSerializedPartition<K> deserializePartition =
DataUtil.deserializePartition(
- partitionMetadata.getElementsTotal(), serializer, key, limitedInputStream);
+ partitionMetadata.getPartitionSize(), serializer, key, limitedInputStream);
deserializedPartitions.add(deserializePartition);
// rearrange file pointer
final long toSkip = partitionMetadata.getPartitionSize() - availableBefore + fileStream.available();
@@ -237,7 +236,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
throw new IOException("The read data size does not match with the partition size.");
}
partitionsInRange.add(new SerializedPartition<>(
- key, partitionmetadata.getElementsTotal(), serializedData, serializedData.length));
+ key, serializedData, serializedData.length));
} else {
// Have to skip this partition.
skipBytes(fileStream, partitionmetadata.getPartitionSize());
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index 3eafe8e..108a4e7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -58,18 +58,16 @@ public abstract class FileMetadata<K extends Serializable> {
*
* @param key the key of the partition.
* @param partitionSize the size of the partition.
- * @param elementsTotal the number of elements in the partition.
* @throws IOException if fail to append the partition metadata.
*/
public final synchronized void writePartitionMetadata(final K key,
- final int partitionSize,
- final long elementsTotal) throws IOException {
+ final int partitionSize) throws IOException {
if (committed.get()) {
throw new IOException("Cannot write a new block to a closed partition.");
}
final PartitionMetadata partitionMetadata =
- new PartitionMetadata(key, partitionSize, writtenBytesCursor, elementsTotal);
+ new PartitionMetadata(key, partitionSize, writtenBytesCursor);
partitionMetadataList.add(partitionMetadata);
writtenBytesCursor += partitionSize;
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
index 9d68e5a..2fa9de2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
@@ -25,7 +25,6 @@ public final class PartitionMetadata<K extends Serializable> {
private final K key;
private final int partitionSize;
private final long offset;
- private final long elementsTotal;
/**
* Constructor.
@@ -33,16 +32,13 @@ public final class PartitionMetadata<K extends Serializable> {
* @param key the key of this partition.
* @param partitionSize the size of this partition.
* @param offset the offset of this partition.
- * @param elementsTotal the total number of elements in this partition.
*/
public PartitionMetadata(final K key,
final int partitionSize,
- final long offset,
- final long elementsTotal) {
+ final long offset) {
this.key = key;
this.partitionSize = partitionSize;
this.offset = offset;
- this.elementsTotal = elementsTotal;
}
/**
@@ -66,13 +62,6 @@ public final class PartitionMetadata<K extends Serializable> {
return offset;
}
- /**
- * @return the total number of elements in this partition.
- */
- public long getElementsTotal() {
- return elementsTotal;
- }
-
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -82,8 +71,6 @@ public final class PartitionMetadata<K extends Serializable> {
sb.append(partitionSize);
sb.append("/ offset: ");
sb.append(offset);
- sb.append("/ elementsTotal: ");
- sb.append(elementsTotal);
return sb.toString();
}
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
index 9a22e43..8ad2126 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
@@ -83,7 +83,6 @@ public final class RemoteFileMetadata<K extends Serializable> extends FileMetada
dataOutputStream.write(key);
dataOutputStream.writeInt(partitionMetadata.getPartitionSize());
dataOutputStream.writeLong(partitionMetadata.getOffset());
- dataOutputStream.writeLong(partitionMetadata.getElementsTotal());
}
}
setCommitted(true);
@@ -127,7 +126,6 @@ public final class RemoteFileMetadata<K extends Serializable> extends FileMetada
final PartitionMetadata<T> partitionMetadata = new PartitionMetadata<>(
SerializationUtils.deserialize(desKey),
dataInputStream.readInt(),
- dataInputStream.readLong(),
dataInputStream.readLong()
);
partitionMetadataList.add(partitionMetadata);
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index d6d63d1..4015000 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -32,7 +32,6 @@ import static edu.snu.nemo.runtime.executor.data.DataUtil.buildOutputStream;
*/
public final class SerializedPartition<K> implements Partition<byte[], K> {
private final K key;
- private volatile long elementsCount;
private volatile byte[] serializedData;
private volatile int length;
private volatile boolean committed;
@@ -52,7 +51,6 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
public SerializedPartition(final K key,
final Serializer serializer) throws IOException {
this.key = key;
- this.elementsCount = 0;
this.serializedData = new byte[0];
this.length = 0;
this.committed = false;
@@ -66,16 +64,13 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
* Data cannot be written to this partition after the construction.
*
* @param key the key.
- * @param elementsTotal the total number of elements.
* @param serializedData the serialized data.
* @param length the length of the actual serialized data. (It can be different with serializedData.length)
*/
public SerializedPartition(final K key,
- final long elementsTotal,
final byte[] serializedData,
final int length) {
this.key = key;
- this.elementsCount = elementsTotal;
this.serializedData = serializedData;
this.length = length;
this.committed = true;
@@ -97,7 +92,6 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
} else {
try {
encoder.encode(element);
- elementsCount++;
} catch (final IOException e) {
wrappedStream.close();
}
@@ -160,16 +154,4 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
return length;
}
}
-
- /**
- * @return the number of elements.
- * @throws IOException if the partition is not committed yet.
- */
- public long getElementsCount() throws IOException {
- if (!committed) {
- throw new IOException("The partition is not committed yet!");
- } else {
- return elementsCount;
- }
- }
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 7a4068e..36db3ec 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -32,8 +32,8 @@ public final class DataTransferFactory {
private final int hashRangeMultiplier;
@Inject
- public DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
- final BlockManagerWorker blockManagerWorker) {
+ private DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
+ final BlockManagerWorker blockManagerWorker) {
this.hashRangeMultiplier = hashRangeMultiplier;
this.blockManagerWorker = blockManagerWorker;
}
@@ -41,7 +41,7 @@ public final class DataTransferFactory {
/**
* Creates an {@link OutputWriter} between two stages.
*
- * @param srcIRVertex the {@link IRVertex} that outputs the data to be written.
+ * @param srcIRVertex the {@link IRVertex} that outputs the data to be written.
* @param srcTaskIdx the index of the source task.
* @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
* @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index bb594f6..1744bb2 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -53,12 +53,12 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
* @param runtimeEdge the {@link RuntimeEdge}.
* @param blockManagerWorker the {@link BlockManagerWorker}.
*/
- public OutputWriter(final int hashRangeMultiplier,
- final int srcTaskIdx,
- final String srcRuntimeVertexId,
- final IRVertex dstIrVertex,
- final RuntimeEdge<?> runtimeEdge,
- final BlockManagerWorker blockManagerWorker) {
+ OutputWriter(final int hashRangeMultiplier,
+ final int srcTaskIdx,
+ final String srcRuntimeVertexId,
+ final IRVertex dstIrVertex,
+ final RuntimeEdge<?> runtimeEdge,
+ final BlockManagerWorker blockManagerWorker) {
super(runtimeEdge.getId());
this.blockId = RuntimeIdGenerator.generateBlockId(getId(), srcTaskIdx);
this.runtimeEdge = runtimeEdge;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index df9454a..f801685 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -91,16 +91,21 @@ import static org.mockito.Mockito.mock;
SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
public final class DataTransferTest {
private static final String EXECUTOR_ID_PREFIX = "Executor";
- private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
- private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE = InterTaskDataStoreProperty.Value.SerializedMemoryStore;
- private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE = InterTaskDataStoreProperty.Value.LocalFileStore;
- private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE = InterTaskDataStoreProperty.Value.GlusterFileStore;
+ private static final InterTaskDataStoreProperty.Value MEMORY_STORE =
+ InterTaskDataStoreProperty.Value.MemoryStore;
+ private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE =
+ InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+ private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE =
+ InterTaskDataStoreProperty.Value.LocalFileStore;
+ private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE =
+ InterTaskDataStoreProperty.Value.GlusterFileStore;
private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
private static final int PARALLELISM_TEN = 10;
private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
- private static final EncoderFactory ENCODER_FACTORY = PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
+ private static final EncoderFactory ENCODER_FACTORY =
+ PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
private static final DecoderFactory DECODER_FACTORY =
PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of());
private static final Tang TANG = Tang.Factory.getTang();
@@ -108,6 +113,7 @@ public final class DataTransferTest {
private BlockManagerMaster master;
private BlockManagerWorker worker1;
+ private DataTransferFactory transferFactory;
private BlockManagerWorker worker2;
private HashMap<BlockManagerWorker, SerializerManager> serializerManagers = new HashMap<>();
@@ -152,10 +158,12 @@ public final class DataTransferTest {
injector2.bindVolatileParameter(JobConf.JobId.class, "data transfer test");
this.master = master;
- this.worker1 = createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), messageDispatcher,
- injector2);
+ final Pair<BlockManagerWorker, DataTransferFactory> pair1 =
+ createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), messageDispatcher, injector2);
+ this.worker1 = pair1.left();
+ this.transferFactory = pair1.right();
this.worker2 = createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), messageDispatcher,
- injector2);
+ injector2).left();
}
@After
@@ -164,8 +172,10 @@ public final class DataTransferTest {
FileUtils.deleteDirectory(new File(TMP_REMOTE_FILE_DIRECTORY));
}
- private BlockManagerWorker createWorker(final String executorId, final LocalMessageDispatcher messageDispatcher,
- final Injector nameClientInjector) {
+ private Pair<BlockManagerWorker, DataTransferFactory> createWorker(
+ final String executorId,
+ final LocalMessageDispatcher messageDispatcher,
+ final Injector nameClientInjector) {
final LocalMessageEnvironment messageEnvironment = new LocalMessageEnvironment(executorId, messageDispatcher);
final PersistentConnectionToMasterMap conToMaster = new PersistentConnectionToMasterMap(messageEnvironment);
final Configuration executorConfiguration = TANG.newConfigurationBuilder()
@@ -180,11 +190,13 @@ public final class DataTransferTest {
final BlockManagerWorker blockManagerWorker;
final MetricManagerWorker metricManagerWorker;
final SerializerManager serializerManager;
+ final DataTransferFactory dataTransferFactory;
try {
blockManagerWorker = injector.getInstance(BlockManagerWorker.class);
- metricManagerWorker = injector.getInstance(MetricManagerWorker.class);
+ metricManagerWorker = injector.getInstance(MetricManagerWorker.class);
serializerManager = injector.getInstance(SerializerManager.class);
serializerManagers.put(blockManagerWorker, serializerManager);
+ dataTransferFactory = injector.getInstance(DataTransferFactory.class);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
@@ -195,11 +207,11 @@ public final class DataTransferTest {
conToMaster,
messageEnvironment,
serializerManager,
- new DataTransferFactory(HASH_RANGE_MULTIPLIER, blockManagerWorker),
+ dataTransferFactory,
metricManagerWorker);
injector.bindVolatileInstance(Executor.class, executor);
- return blockManagerWorker;
+ return Pair.of(blockManagerWorker, dataTransferFactory);
}
private Injector createNameClientInjector() {
@@ -336,8 +348,7 @@ public final class DataTransferTest {
final List<List> dataWrittenList = new ArrayList<>();
IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
- final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex,
- dummyEdge, sender);
+ final OutputWriter writer = transferFactory.createWriter(srcVertex, srcTaskIndex, dstVertex, dummyEdge);
dataWritten.iterator().forEachRemaining(writer::write);
writer.close();
dataWrittenList.add(dataWritten);
@@ -432,14 +443,12 @@ public final class DataTransferTest {
final List<List> dataWrittenList = new ArrayList<>();
IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
- final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex,
- dummyEdge, sender);
+ final OutputWriter writer = transferFactory.createWriter(srcVertex, srcTaskIndex, dstVertex, dummyEdge);
dataWritten.iterator().forEachRemaining(writer::write);
writer.close();
dataWrittenList.add(dataWritten);
- final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex, srcVertex.getId(), dstVertex,
- dummyEdge2, sender);
+ final OutputWriter writer2 = transferFactory.createWriter(srcVertex, srcTaskIndex, dstVertex, dummyEdge2);
dataWritten.iterator().forEachRemaining(writer2::write);
writer2.close();
});
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index 5d24f81..ab1beb7 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -63,9 +63,8 @@ public class SailfishPassTest {
edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
- // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
- //assertEquals(BytesDecoderFactory.of(),
- // edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+ assertEquals(BytesDecoderFactory.of(),
+ edgeToMerger.getPropertyValue(DecoderProperty.class).get());
} else {
assertEquals(DataFlowModelProperty.Value.Pull,
edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
@@ -78,9 +77,8 @@ public class SailfishPassTest {
edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
- // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
- //assertEquals(BytesEncoderFactory.of(),
- // edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+ assertEquals(BytesEncoderFactory.of(),
+ edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
});
} else {
// Non merger vertex.