You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/10/30 01:13:23 UTC
[incubator-nemo] branch master updated: [NEMO-8] Implement
PipeManagerMaster/Worker (#129)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 0c826c2 [NEMO-8] Implement PipeManagerMaster/Worker (#129)
0c826c2 is described below
commit 0c826c26a547597b9f97e24c809ff073c2ffd30b
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Oct 30 10:13:18 2018 +0900
[NEMO-8] Implement PipeManagerMaster/Worker (#129)
JIRA: [NEMO-8: Implement PipeManagerMaster/Worker](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-8)
**Major changes:**
- Supports fully-pipelined data streaming for bounded sources (not unbounded sources)
- Tasks do 'finish' after processing all input data, as the data is finite
- When a tasks finishes, it emits all data it has (e.g., GroupByKey accumulated results) and closes corresponding outgoing pipes, notifying downstream tasks the end of the pipes
- For stream-processing unbounded sources, we need watermarks (https://issues.apache.org/jira/browse/NEMO-233)
- Introduces PipeManagerMaster/Worker
- Shares code with BlockManagerMaster/Worker
- Naive, Element-wise serialization+compression+writeAndFlush
- Very likely that this will cause some serious overheads. Will run proper benchmarks and fix the issues in a later PR.
**Minor changes to note:**
- JobConf#SchedulerImplClassName: Batch and Streaming options
- StreamingPolicyParallelismFive: The default policy + PipeTransferEverythingPass
- Fixes the StreamingScheduler to pass the new streaming integration tests
- Fixes a coder bug in the Beam frontend (PCollectionView coder)
**Tests for the changes:**
- WindowedWordCountITCase#testStreamingFixedWindow
- WindowedWordCountITCase#testStreamingSlidingWindow
**Other comments:**
- Also closes "Implement common API for data transfer" (https://issues.apache.org/jira/browse/NEMO-9)
Closes #129
---
checkstyle.xml | 2 +-
.../java/org/apache/nemo/client/JobLauncher.java | 15 +-
.../edge/executionproperty/DataStoreProperty.java | 1 +
.../org/apache/nemo/common/test/ArgBuilder.java | 9 +
.../compiler/frontend/beam/PipelineTranslator.java | 18 +-
.../annotating/PipeTransferForAllEdgesPass.java | 47 +++++
.../main/java/org/apache/nemo/conf/JobConf.java | 8 +
.../examples/beam/WindowedWordCountITCase.java | 48 ++++-
.../policy/StreamingPolicyParallelismFive.java | 57 ++++++
.../runtime/common/message/MessageEnvironment.java | 1 +
.../common/message/ncs/NcsMessageEnvironment.java | 7 +
.../nemo/runtime/common/plan/RuntimeEdge.java | 15 ++
runtime/common/src/main/proto/ControlMessage.proto | 33 +++-
.../org/apache/nemo/runtime/executor/Executor.java | 10 +-
.../executor/bytetransfer/ByteTransfer.java | 12 +-
.../executor/bytetransfer/ByteTransferContext.java | 10 +-
.../ByteTransportChannelInitializer.java | 11 +-
.../executor/bytetransfer/ContextManager.java | 40 +++--
.../executor/bytetransfer/ControlFrameEncoder.java | 1 +
.../runtime/executor/data/BlockManagerWorker.java | 10 +-
.../executor/data/BroadcastManagerWorker.java | 1 +
.../nemo/runtime/executor/data/PipeContainer.java | 150 ++++++++++++++++
.../runtime/executor/data/PipeManagerWorker.java | 199 +++++++++++++++++++++
.../{InputReader.java => BlockInputReader.java} | 125 +++++--------
.../{OutputWriter.java => BlockOutputWriter.java} | 90 +++-------
.../executor/datatransfer/DataTransfer.java | 40 -----
.../runtime/executor/datatransfer/InputReader.java | 155 +---------------
...Factory.java => IntermediateDataIOFactory.java} | 35 +++-
.../executor/datatransfer/OutputWriter.java | 158 ++++------------
.../executor/datatransfer/PipeInputReader.java | 82 +++++++++
.../executor/datatransfer/PipeOutputWriter.java | 136 ++++++++++++++
.../executor/task/ParentTaskDataFetcher.java | 2 +
.../nemo/runtime/executor/task/TaskExecutor.java | 42 ++---
.../executor/datatransfer/DataTransferTest.java | 77 +++++---
.../executor/task/ParentTaskDataFetcherTest.java | 6 +-
.../runtime/executor/task/TaskExecutorTest.java | 25 ++-
.../nemo/runtime/master/PipeManagerMaster.java | 157 ++++++++++++++++
.../nemo/runtime/master/scheduler/Scheduler.java | 2 -
.../master/scheduler/StreamingScheduler.java | 29 ++-
.../runtime/master/scheduler/TaskDispatcher.java | 10 +-
.../master/scheduler/StreamingSchedulerTest.java | 8 +-
.../runtime/master/scheduler/TaskRetryTest.java | 2 +-
42 files changed, 1298 insertions(+), 588 deletions(-)
diff --git a/checkstyle.xml b/checkstyle.xml
index 1ab4089..3d52af6 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -169,7 +169,7 @@ under the License.
<module name="InterfaceIsType"/>
<module name="VisibilityModifier"/>
- <!-- Miscellaneous other checks. -->
+ <!-- Miscellaneous other checks. -->
<!-- See http://checkstyle.sf.net/config_misc.html -->
<module name="ArrayTypeStyle"/>
<module name="FinalParameters"/>
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 7e572df..5a54e6b 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -27,6 +27,7 @@ import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageParameters;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.DriverLauncher;
import org.apache.reef.client.parameters.JobMessageHandler;
@@ -109,10 +110,11 @@ public final class JobLauncher {
final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
JobConf.BandwidthJSONContents.class);
final Configuration clientConf = getClientConf();
+ final Configuration schedulerConf = getSchedulerConf(builtJobConf);
// Merge Job and Driver Confs
jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfg,
- executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration());
+ executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
// Get DeployMode Conf
deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
@@ -249,6 +251,16 @@ public final class JobLauncher {
return jcb.build();
}
+ private static Configuration getSchedulerConf(final Configuration jobConf)
+ throws ClassNotFoundException, InjectionException {
+ final Injector injector = TANG.newInjector(jobConf);
+ final String classImplName = injector.getNamedInstance(JobConf.SchedulerImplClassName.class);
+ final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
+ final Class schedulerImpl = ((Class<Scheduler>) Class.forName(classImplName));
+ jcb.bindImplementation(Scheduler.class, schedulerImpl);
+ return jcb.build();
+ }
+
/**
* Get driver ncs configuration.
*
@@ -331,6 +343,7 @@ public final class JobLauncher {
cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
+ cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
cl.processCommandLine(args);
return confBuilder.build();
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
index 8a60871..e53157b 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
@@ -45,6 +45,7 @@ public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProp
* Possible values of DataStore ExecutionProperty.
*/
public enum Value {
+ Pipe,
MemoryStore,
SerializedMemoryStore,
LocalFileStore,
diff --git a/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java b/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java
index cd8e083..954015c 100644
--- a/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java
@@ -98,6 +98,15 @@ public final class ArgBuilder {
}
/**
+ * @param schedulerName scheduler.
+ * @return builder with the scheduler.
+ */
+ public ArgBuilder addScheduler(final String schedulerName) {
+ args.add(Arrays.asList("-scheduler_impl_class_name", schedulerName));
+ return this;
+ }
+
+ /**
* @return the built arguments.
*/
public String[] build() {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 58a6d4a..7dc7af6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -419,21 +419,22 @@ public final class PipelineTranslator
*/
private static Coder<?> getCoderForView(final PCollectionView view, final CompositeTransformVertex pipeline) {
final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
- final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
- .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
+ final Coder<?> baseCoder = src.getNode().getOutputs().values().stream()
+ .filter(v -> v instanceof PCollection)
+ .map(v -> (PCollection) v)
+ .findFirst()
.orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
.getCoder();
+ final KvCoder<?, ?> inputKVCoder = (KvCoder) baseCoder;
final ViewFn viewFn = view.getViewFn();
if (viewFn instanceof PCollectionViews.IterableViewFn) {
- return IterableCoder.of(baseCoder);
+ return IterableCoder.of(inputKVCoder.getValueCoder());
} else if (viewFn instanceof PCollectionViews.ListViewFn) {
- return ListCoder.of(baseCoder);
+ return ListCoder.of(inputKVCoder.getValueCoder());
} else if (viewFn instanceof PCollectionViews.MapViewFn) {
- final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
- return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
} else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
- final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
- return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
+ return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
} else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
return baseCoder;
} else {
@@ -676,6 +677,7 @@ public final class PipelineTranslator
private static final class OneToOneCommunicationPatternSelector
implements BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> {
private static final OneToOneCommunicationPatternSelector INSTANCE = new OneToOneCommunicationPatternSelector();
+
@Override
public CommunicationPatternProperty.Value apply(final IRVertex src, final IRVertex dst) {
return CommunicationPatternProperty.Value.OneToOne;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
new file mode 100644
index 0000000..25999dc
--- /dev/null
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+
+/**
+ * Annotate 'Pipe' on all edges.
+ */
+@Annotates(DataStoreProperty.class)
+public final class PipeTransferForAllEdgesPass extends AnnotatingPass {
+ /**
+ * Default constructor.
+ */
+ public PipeTransferForAllEdgesPass() {
+ super(PipeTransferForAllEdgesPass.class);
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ dag.getVertices().forEach(vertex -> {
+ dag.getIncomingEdgesOf(vertex).stream()
+ .forEach(edge -> edge.setPropertyPermanently(
+ DataStoreProperty.of(DataStoreProperty.Value.Pipe)));
+ });
+ return dag;
+ }
+}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index b78c9fa..7de0e18 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -135,6 +135,14 @@ public final class JobConf extends ConfigurationModuleBuilder {
public final class MaxTaskAttempt implements Name<Integer> {
}
+ /**
+ * Scheduler impl.
+ */
+ @NamedParameter(doc = "Class name of the scheduler to use", short_name = "scheduler_impl_class_name",
+ default_value = "org.apache.nemo.runtime.master.scheduler.BatchScheduler")
+ public final class SchedulerImplClassName implements Name<String> {
+ }
+
//////////////////////////////// Runtime Executor Configurations
/**
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index ed6e0eb..b523543 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -21,11 +21,8 @@ package org.apache.nemo.examples.beam;
import org.apache.nemo.client.JobLauncher;
import org.apache.nemo.common.test.ArgBuilder;
import org.apache.nemo.common.test.ExampleTestUtil;
-import org.apache.nemo.compiler.optimizer.policy.ConditionalLargeShufflePolicy;
import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
import org.apache.nemo.examples.beam.policy.*;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -50,7 +47,7 @@ public final class WindowedWordCountITCase {
private static final String outputFilePath = fileBasePath + outputFileName;
@Test (timeout = TIMEOUT)
- public void testFixedWindow() throws Exception {
+ public void testBatchFixedWindow() throws Exception {
builder = new ArgBuilder()
.addUserMain(WindowedWordCount.class.getCanonicalName())
.addUserArgs(inputFilePath, outputFilePath, "fixed");
@@ -70,7 +67,7 @@ public final class WindowedWordCountITCase {
@Test (timeout = TIMEOUT)
- public void testSlidingWindow() throws Exception {
+ public void testBatchSlidingWindow() throws Exception {
builder = new ArgBuilder()
.addUserMain(WindowedWordCount.class.getCanonicalName())
.addUserArgs(inputFilePath, outputFilePath, "sliding");
@@ -87,4 +84,45 @@ public final class WindowedWordCountITCase {
ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
}
}
+
+ @Test (timeout = TIMEOUT)
+ public void testStreamingSchedulerAndPipeFixedWindow() throws Exception {
+ builder = new ArgBuilder()
+ .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+ .addUserMain(WindowedWordCount.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath, "fixed");
+
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+
+
+ @Test (timeout = TIMEOUT)
+ public void testStreamingSchedulerAndPipeSlidingWindow() throws Exception {
+ builder = new ArgBuilder()
+ .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+ .addUserMain(WindowedWordCount.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath, "sliding");
+
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/policy/StreamingPolicyParallelismFive.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/policy/StreamingPolicyParallelismFive.java
new file mode 100644
index 0000000..ee59ea3
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/policy/StreamingPolicyParallelismFive.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam.policy;
+
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.PipeTransferForAllEdgesPass;
+import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.compiler.optimizer.policy.Policy;
+import org.apache.nemo.compiler.optimizer.policy.PolicyBuilder;
+import org.apache.reef.tang.Injector;
+
+/**
+ * Streaming policy.
+ */
+public final class StreamingPolicyParallelismFive implements Policy {
+ private final Policy policy;
+
+ /**
+ * Default constructor.
+ */
+ public StreamingPolicyParallelismFive() {
+ final PolicyBuilder builder = new PolicyBuilder();
+ PolicyTestUtil.overwriteParallelism(5, DefaultPolicy.BUILDER.getCompileTimePasses())
+ .forEach(ctPass -> builder.registerCompileTimePass(ctPass));
+ builder.registerCompileTimePass(new PipeTransferForAllEdgesPass());
+ this.policy = builder.build();
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
+ return this.policy.runCompileTimeOptimization(dag, dagDirectory);
+ }
+
+ @Override
+ public void registerRunTimeOptimizations(final Injector injector, final PubSubEventHandlerWrapper pubSubWrapper) {
+ this.policy.registerRunTimeOptimizations(injector, pubSubWrapper);
+ }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java
index 900ae31..1eb3141 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java
@@ -35,6 +35,7 @@ public interface MessageEnvironment {
// The globally known message listener IDs.
String RUNTIME_MASTER_MESSAGE_LISTENER_ID = "RUNTIME_MASTER_MESSAGE_LISTENER_ID";
String BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID = "BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID";
+ String PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID = "PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID";
String EXECUTOR_MESSAGE_LISTENER_ID = "EXECUTOR_MESSAGE_LISTENER_ID";
/**
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 25f7e5e..f335a7f 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -206,12 +206,15 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
case MetricMessageReceived:
case RequestMetricFlush:
case MetricFlushed:
+ case PipeInit:
return MessageType.Send;
case RequestBlockLocation:
case RequestBroadcastVariable:
+ case RequestPipeLoc:
return MessageType.Request;
case BlockLocationInfo:
case InMasterBroadcastVariable:
+ case PipeLocInfo:
return MessageType.Reply;
default:
throw new IllegalArgumentException(controlMessage.toString());
@@ -224,6 +227,8 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
return controlMessage.getRequestBlockLocationMsg().getExecutorId();
case RequestBroadcastVariable:
return controlMessage.getRequestbroadcastVariableMsg().getExecutorId();
+ case RequestPipeLoc:
+ return controlMessage.getRequestPipeLocMsg().getExecutorId();
default:
throw new IllegalArgumentException(controlMessage.toString());
}
@@ -235,6 +240,8 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
return controlMessage.getBlockLocationInfoMsg().getRequestId();
case InMasterBroadcastVariable:
return controlMessage.getBroadcastVariableMsg().getRequestId();
+ case PipeLocInfo:
+ return controlMessage.getPipeLocInfoMsg().getRequestId();
default:
throw new IllegalArgumentException(controlMessage.toString());
}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
index b0e70c4..bf98a86 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
@@ -64,6 +64,21 @@ public class RuntimeEdge<V extends Vertex> extends Edge<V> {
}
/**
+ * @param executionPropertyKey key
+ * @param <T> type
+ * @return the value
+ */
+ public final <T extends Serializable> T getPropertyValueOrRuntimeException(
+ final Class<? extends EdgeExecutionProperty<T>> executionPropertyKey) {
+ final Optional<T> optional = getPropertyValue(executionPropertyKey);
+ if (optional.isPresent()) {
+ return optional.get();
+ } else {
+ throw new IllegalStateException(executionPropertyKey.toString());
+ }
+ }
+
+ /**
* @return the ExecutionPropertyMap of the Runtime Edge.
*/
public final ExecutionPropertyMap<EdgeExecutionProperty> getExecutionProperties() {
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 7cc62e0..617f50f 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -68,6 +68,9 @@ enum MessageType {
MetricFlushed = 10;
RequestBroadcastVariable = 11;
InMasterBroadcastVariable = 12;
+ PipeInit = 13;
+ RequestPipeLoc = 14;
+ PipeLocInfo = 15;
}
message Message {
@@ -86,6 +89,9 @@ message Message {
optional DataCollectMessage dataCollected = 13;
optional RequestBroadcastVariableMessage requestbroadcastVariableMsg = 14;
optional InMasterBroadcastVariableMessage broadcastVariableMsg = 15;
+ optional PipeInitMessage pipeInitMsg = 16;
+ optional RequestPipeLocationMessage requestPipeLocMsg = 17;
+ optional PipeLocationInfoMessage pipeLocInfoMsg = 18;
}
// Messages from Master to Executors
@@ -162,15 +168,23 @@ message ByteTransferContextSetupMessage {
required int32 transferIndex = 2;
required ByteTransferDataDirection dataDirection = 3;
required bytes contextDescriptor = 4;
+ required bool isPipe = 5;
}
-message ByteTransferContextDescriptor {
+message BlockTransferContextDescriptor {
required string blockId = 1;
required BlockStore blockStore = 2;
required string runtimeEdgeId = 3;
optional bytes keyRange = 4;
}
+message PipeTransferContextDescriptor {
+ required int64 srcTaskIndex = 1;
+ required string runtimeEdgeId = 2;
+ required int64 dstTaskIndex = 3;
+ required int64 numPipeToWait = 4;
+}
+
enum TaskStateFromExecutor {
READY = 0;
EXECUTING = 1;
@@ -210,3 +224,20 @@ message InMasterBroadcastVariableMessage {
required int64 requestId = 1; // To find the matching request msg
required bytes variable = 2;
}
+
+message PipeInitMessage {
+ required int64 srcTaskIndex = 1;
+ required string runtimeEdgeId = 2;
+ required string executorId = 3;
+}
+
+message RequestPipeLocationMessage {
+ required string executorId = 1;
+ required int64 srcTaskIndex = 2;
+ required string runtimeEdgeId = 3;
+}
+
+message PipeLocationInfoMessage {
+ required int64 requestId = 1; // To find the matching request msg
+ required string executorId = 2;
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index b7597a8..b218bf8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -38,7 +38,7 @@ import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.data.SerializerManager;
-import org.apache.nemo.runtime.executor.datatransfer.DataTransferFactory;
+import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
import org.apache.nemo.runtime.executor.task.TaskExecutor;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -71,7 +71,7 @@ public final class Executor {
/**
* Factory of InputReader/OutputWriter for executing tasks groups.
*/
- private final DataTransferFactory dataTransferFactory;
+ private final IntermediateDataIOFactory intermediateDataIOFactory;
private final BroadcastManagerWorker broadcastManagerWorker;
@@ -84,7 +84,7 @@ public final class Executor {
final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
final MessageEnvironment messageEnvironment,
final SerializerManager serializerManager,
- final DataTransferFactory dataTransferFactory,
+ final IntermediateDataIOFactory intermediateDataIOFactory,
final BroadcastManagerWorker broadcastManagerWorker,
final MetricManagerWorker metricMessageSender) {
this.executorId = executorId;
@@ -93,7 +93,7 @@ public final class Executor {
.build());
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.serializerManager = serializerManager;
- this.dataTransferFactory = dataTransferFactory;
+ this.intermediateDataIOFactory = intermediateDataIOFactory;
this.broadcastManagerWorker = broadcastManagerWorker;
this.metricMessageSender = metricMessageSender;
messageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID, new ExecutorMessageReceiver());
@@ -138,7 +138,7 @@ public final class Executor {
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
});
- new TaskExecutor(task, irDag, taskStateManager, dataTransferFactory, broadcastManagerWorker,
+ new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
metricMessageSender, persistentConnectionToMasterMap).execute();
} catch (final Exception e) {
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java
index 1a09f3e..ef49c36 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java
@@ -52,22 +52,26 @@ public final class ByteTransfer {
* Initiate a transfer context to receive data.
* @param executorId the id of the remote executor
* @param contextDescriptor user-provided descriptor for the new context
+ * @param isPipe is pipe
* @return a {@link ByteInputContext} from which the received data can be read
*/
public CompletableFuture<ByteInputContext> newInputContext(final String executorId,
- final byte[] contextDescriptor) {
- return connectTo(executorId).thenApply(manager -> manager.newInputContext(executorId, contextDescriptor));
+ final byte[] contextDescriptor,
+ final boolean isPipe) {
+ return connectTo(executorId).thenApply(manager -> manager.newInputContext(executorId, contextDescriptor, isPipe));
}
/**
* Initiate a transfer context to send data.
* @param executorId the id of the remote executor
* @param contextDescriptor user-provided descriptor for the new context
+ * @param isPipe is pipe
* @return a {@link ByteOutputContext} to which data can be written
*/
public CompletableFuture<ByteOutputContext> newOutputContext(final String executorId,
- final byte[] contextDescriptor) {
- return connectTo(executorId).thenApply(manager -> manager.newOutputContext(executorId, contextDescriptor));
+ final byte[] contextDescriptor,
+ final boolean isPipe) {
+ return connectTo(executorId).thenApply(manager -> manager.newOutputContext(executorId, contextDescriptor, isPipe));
}
/**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
index de2af97..55226f3 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
@@ -141,6 +141,7 @@ public abstract class ByteTransferContext {
private final String partnerExecutorId;
private final ByteTransferDataDirection dataDirection;
private final int transferIndex;
+ private final boolean isPipe;
/**
* Create {@link ContextId}.
@@ -148,15 +149,18 @@ public abstract class ByteTransferContext {
* @param partnerExecutorId the other executor
* @param dataDirection the direction of the data flow
* @param transferIndex an index issued by the initiator
+ * @param isPipe is a pipe context
*/
ContextId(final String initiatorExecutorId,
final String partnerExecutorId,
final ByteTransferDataDirection dataDirection,
- final int transferIndex) {
+ final int transferIndex,
+ final boolean isPipe) {
this.initiatorExecutorId = initiatorExecutorId;
this.partnerExecutorId = partnerExecutorId;
this.dataDirection = dataDirection;
this.transferIndex = transferIndex;
+ this.isPipe = isPipe;
}
public String getInitiatorExecutorId() {
@@ -167,6 +171,10 @@ public abstract class ByteTransferContext {
return partnerExecutorId;
}
+ public boolean isPipe() {
+ return isPipe;
+ }
+
public ByteTransferDataDirection getDataDirection() {
return dataDirection;
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java
index 351fba6..ee80527 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java
@@ -22,6 +22,7 @@ import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
@@ -59,6 +60,7 @@ import javax.inject.Inject;
*/
final class ByteTransportChannelInitializer extends ChannelInitializer<SocketChannel> {
+ private final InjectionFuture<PipeManagerWorker> pipeManagerWorker;
private final InjectionFuture<BlockManagerWorker> blockManagerWorker;
private final InjectionFuture<ByteTransfer> byteTransfer;
private final InjectionFuture<ByteTransport> byteTransport;
@@ -69,6 +71,7 @@ final class ByteTransportChannelInitializer extends ChannelInitializer<SocketCha
/**
* Creates a netty channel initializer.
*
+ * @param pipeManagerWorker provides handler for new contexts by remote executors
* @param blockManagerWorker provides handler for new contexts by remote executors
* @param byteTransfer provides channel caching
* @param byteTransport provides {@link io.netty.channel.group.ChannelGroup}
@@ -77,12 +80,14 @@ final class ByteTransportChannelInitializer extends ChannelInitializer<SocketCha
* @param localExecutorId the id of this executor
*/
@Inject
- private ByteTransportChannelInitializer(final InjectionFuture<BlockManagerWorker> blockManagerWorker,
+ private ByteTransportChannelInitializer(final InjectionFuture<PipeManagerWorker> pipeManagerWorker,
+ final InjectionFuture<BlockManagerWorker> blockManagerWorker,
final InjectionFuture<ByteTransfer> byteTransfer,
final InjectionFuture<ByteTransport> byteTransport,
final ControlFrameEncoder controlFrameEncoder,
final DataFrameEncoder dataFrameEncoder,
@Parameter(JobConf.ExecutorId.class) final String localExecutorId) {
+ this.pipeManagerWorker = pipeManagerWorker;
this.blockManagerWorker = blockManagerWorker;
this.byteTransfer = byteTransfer;
this.byteTransport = byteTransport;
@@ -93,8 +98,8 @@ final class ByteTransportChannelInitializer extends ChannelInitializer<SocketCha
@Override
protected void initChannel(final SocketChannel ch) {
- final ContextManager contextManager = new ContextManager(blockManagerWorker.get(), byteTransfer.get(),
- byteTransport.get().getChannelGroup(), localExecutorId, ch);
+ final ContextManager contextManager = new ContextManager(pipeManagerWorker.get(), blockManagerWorker.get(),
+ byteTransfer.get(), byteTransport.get().getChannelGroup(), localExecutorId, ch);
ch.pipeline()
// inbound
.addLast(new FrameDecoder(contextManager))
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java
index 377b80c..c694657 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java
@@ -24,6 +24,7 @@ import org.apache.nemo.runtime.executor.bytetransfer.ByteTransferContext.Context
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -35,6 +36,7 @@ import java.util.function.Function;
*/
final class ContextManager extends SimpleChannelInboundHandler<ByteTransferContextSetupMessage> {
+ private final PipeManagerWorker pipeManagerWorker;
private final BlockManagerWorker blockManagerWorker;
private final ByteTransfer byteTransfer;
private final ChannelGroup channelGroup;
@@ -51,17 +53,20 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
/**
* Creates context manager for this channel.
+ * @param pipeManagerWorker provides handler for new contexts by remote executors
* @param blockManagerWorker provides handler for new contexts by remote executors
* @param byteTransfer provides channel caching
* @param channelGroup to cleanup this channel when closing {@link ByteTransport}
* @param localExecutorId local executor id
* @param channel the {@link Channel} to manage
*/
- ContextManager(final BlockManagerWorker blockManagerWorker,
+ ContextManager(final PipeManagerWorker pipeManagerWorker,
+ final BlockManagerWorker blockManagerWorker,
final ByteTransfer byteTransfer,
final ChannelGroup channelGroup,
final String localExecutorId,
final Channel channel) {
+ this.pipeManagerWorker = pipeManagerWorker;
this.blockManagerWorker = blockManagerWorker;
this.byteTransfer = byteTransfer;
this.channelGroup = channelGroup;
@@ -103,7 +108,9 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
byteTransfer.onNewContextByRemoteExecutor(message.getInitiatorExecutorId(), channel);
final ByteTransferDataDirection dataDirection = message.getDataDirection();
final int transferIndex = message.getTransferIndex();
- final ContextId contextId = new ContextId(remoteExecutorId, localExecutorId, dataDirection, transferIndex);
+ final boolean isPipe = message.getIsPipe();
+ final ContextId contextId =
+ new ContextId(remoteExecutorId, localExecutorId, dataDirection, transferIndex, isPipe);
final byte[] contextDescriptor = message.getContextDescriptor().toByteArray();
if (dataDirection == ByteTransferDataDirection.INITIATOR_SENDS_DATA) {
@@ -113,7 +120,12 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
}
return new ByteInputContext(remoteExecutorId, contextId, contextDescriptor, this);
});
- blockManagerWorker.onInputContext(context);
+
+ if (isPipe) {
+ pipeManagerWorker.onInputContext(context);
+ } else {
+ blockManagerWorker.onInputContext(context);
+ }
} else {
final ByteOutputContext context = outputContextsInitiatedByRemote.compute(transferIndex, (idx, existing) -> {
if (existing != null) {
@@ -121,7 +133,11 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
}
return new ByteOutputContext(remoteExecutorId, contextId, contextDescriptor, this);
});
- blockManagerWorker.onOutputContext(context);
+ if (isPipe) {
+ pipeManagerWorker.onOutputContext(context);
+ } else {
+ blockManagerWorker.onOutputContext(context);
+ }
}
}
@@ -147,16 +163,18 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
* @param contextGenerator a function that returns context from context id
* @param executorId id of the remote executor
* @param <T> {@link ByteInputContext} or {@link ByteOutputContext}
+ * @param isPipe is a pipe context
* @return generated context
*/
<T extends ByteTransferContext> T newContext(final ConcurrentMap<Integer, T> contexts,
final AtomicInteger transferIndexCounter,
final ByteTransferDataDirection dataDirection,
final Function<ContextId, T> contextGenerator,
- final String executorId) {
+ final String executorId,
+ final boolean isPipe) {
setRemoteExecutorId(executorId);
final int transferIndex = transferIndexCounter.getAndIncrement();
- final ContextId contextId = new ContextId(localExecutorId, executorId, dataDirection, transferIndex);
+ final ContextId contextId = new ContextId(localExecutorId, executorId, dataDirection, transferIndex, isPipe);
final T context = contexts.compute(transferIndex, (index, existingContext) -> {
if (existingContext != null) {
throw new RuntimeException(String.format("Duplicate ContextId: %s", contextId));
@@ -171,26 +189,28 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
* Create a new {@link ByteInputContext}.
* @param executorId target executor id
* @param contextDescriptor the context descriptor
+ * @param isPipe is pipe
* @return new {@link ByteInputContext}
*/
- ByteInputContext newInputContext(final String executorId, final byte[] contextDescriptor) {
+ ByteInputContext newInputContext(final String executorId, final byte[] contextDescriptor, final boolean isPipe) {
return newContext(inputContextsInitiatedByLocal, nextInputTransferIndex,
ByteTransferDataDirection.INITIATOR_RECEIVES_DATA,
contextId -> new ByteInputContext(executorId, contextId, contextDescriptor, this),
- executorId);
+ executorId, isPipe);
}
/**
* Create a new {@link ByteOutputContext}.
* @param executorId target executor id
* @param contextDescriptor the context descriptor
+ * @param isPipe is pipe
* @return new {@link ByteOutputContext}
*/
- ByteOutputContext newOutputContext(final String executorId, final byte[] contextDescriptor) {
+ ByteOutputContext newOutputContext(final String executorId, final byte[] contextDescriptor, final boolean isPipe) {
return newContext(outputContextsInitiatedByLocal, nextOutputTransferIndex,
ByteTransferDataDirection.INITIATOR_SENDS_DATA,
contextId -> new ByteOutputContext(executorId, contextId, contextDescriptor, this),
- executorId);
+ executorId, isPipe);
}
/**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java
index f303017..836177a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java
@@ -62,6 +62,7 @@ final class ControlFrameEncoder extends MessageToMessageEncoder<ByteTransferCont
.setTransferIndex(in.getContextId().getTransferIndex())
.setDataDirection(in.getContextId().getDataDirection())
.setContextDescriptor(ByteString.copyFrom(in.getContextDescriptor()))
+ .setIsPipe(in.getContextId().isPipe())
.build();
final byte[] frameBody = message.toByteArray();
out.add(ZEROS.retain());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 6fc53c4..b404592 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -29,7 +29,6 @@ import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
-import org.apache.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -199,7 +198,8 @@ public final class BlockManagerWorker {
// Block resides in the evaluator
return getDataFromLocalBlock(blockId, blockStore, keyRange);
} else {
- final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.newBuilder()
+ final ControlMessage.BlockTransferContextDescriptor descriptor =
+ ControlMessage.BlockTransferContextDescriptor.newBuilder()
.setBlockId(blockId)
.setBlockStore(convertBlockStore(blockStore))
.setRuntimeEdgeId(runtimeEdgeId)
@@ -207,7 +207,7 @@ public final class BlockManagerWorker {
.build();
final CompletableFuture<ByteInputContext> contextFuture = blockTransferThrottler
.requestTransferPermission(runtimeEdgeId)
- .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray()));
+ .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), false));
// whenComplete() ensures that blockTransferThrottler.onTransferFinished() is always called,
// even on failures. Actual failure handling and Task retry will be done by DataFetcher.
@@ -328,8 +328,8 @@ public final class BlockManagerWorker {
* @throws InvalidProtocolBufferException from errors during parsing context descriptor
*/
public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
- final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.PARSER
- .parseFrom(outputContext.getContextDescriptor());
+ final ControlMessage.BlockTransferContextDescriptor descriptor =
+ ControlMessage.BlockTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
final String blockId = descriptor.getBlockId();
final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
index 084f0ef..c8a87fc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
@@ -128,6 +128,7 @@ public final class BroadcastManagerWorker {
* @return the variable.
*/
public Object get(final Serializable id) {
+ LOG.info("get {}", id);
try {
return idToVariableCache.get(id);
} catch (ExecutionException e) {
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeContainer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeContainer.java
new file mode 100644
index 0000000..a49ef78
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeContainer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Writes happen in a serialized manner with {@link PipeContainer#putPipeListIfAbsent(Pair, int)}.
+ * This ensures that each key is initialized exactly once, and never updated.
+ *
+ * Writes and reads for the same key never occur concurrently with no problem, because
+ * (1) write never updates, and (2) read happens only after the write.
+ *
+ * Reads can happen concurrently with no problem.
+ */
+@ThreadSafe
+public final class PipeContainer {
+ private static final Logger LOG = LoggerFactory.getLogger(PipeContainer.class.getName());
+ private final ConcurrentHashMap<Pair<String, Long>, CountBasedBlockingContainer<ByteOutputContext>> pipeMap;
+
+ PipeContainer() {
+ this.pipeMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Blocks the get operation when the number of elements is smaller than expected.
+ * @param <T> type of the value.
+ */
+ class CountBasedBlockingContainer<T> {
+ private final Map<Integer, T> indexToValue;
+ private final int expected;
+ private final Lock lock;
+ private final Condition condition;
+
+ CountBasedBlockingContainer(final int expected) {
+ this.indexToValue = new HashMap<>(expected);
+ this.expected = expected;
+ this.lock = new ReentrantLock();
+ this.condition = lock.newCondition();
+ }
+
+ public List<T> getValuesBlocking() {
+ lock.lock();
+ try {
+ if (!isCountSatistified()) {
+ condition.await();
+ }
+ return new ArrayList<>(indexToValue.values());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setValue(final int index, final T value) {
+ lock.lock();
+ try {
+ final T previous = indexToValue.put(index, value);
+ if (null != previous) {
+ throw new IllegalStateException(previous.toString());
+ }
+
+ if (isCountSatistified()) {
+ condition.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean isCountSatistified() {
+ if (indexToValue.size() < expected) {
+ return false;
+ } else if (indexToValue.size() == expected) {
+ return true;
+ } else {
+ throw new IllegalStateException(indexToValue.size() + " < " + expected);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return indexToValue.toString();
+ }
+ }
+
+ /**
+ * (SYNCHRONIZATION) Initialize the key exactly once.
+ *
+ * @param pairKey
+ * @param expected
+ */
+ synchronized void putPipeListIfAbsent(final Pair<String, Long> pairKey, final int expected) {
+ pipeMap.putIfAbsent(pairKey, new CountBasedBlockingContainer(expected));
+ }
+
+ /**
+ * (SYNCHRONIZATION) CountBasedBlockingContainer takes care of it.
+ *
+ * @param pairKey
+ * @param dstTaskIndex
+ * @param byteOutputContext
+ */
+ void putPipe(final Pair<String, Long> pairKey, final int dstTaskIndex, final ByteOutputContext byteOutputContext) {
+ final CountBasedBlockingContainer<ByteOutputContext> container = pipeMap.get(pairKey);
+ container.setValue(dstTaskIndex, byteOutputContext);
+ }
+
+ /**
+ * (SYNCHRONIZATION) CountBasedBlockingContainer takes care of it.
+ *
+ * @param pairKey
+ * @return
+ */
+ List<ByteOutputContext> getPipes(final Pair<String, Long> pairKey) {
+ final CountBasedBlockingContainer<ByteOutputContext> container = pipeMap.get(pairKey);
+ return container.getValuesBlocking();
+ }
+}
+
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
new file mode 100644
index 0000000..ffbbe56
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteInputContext;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteTransfer;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Two threads use this class
+ * - Network thread: Saves pipe connections created from destination tasks.
+ * - Task executor thread: Creates new pipe connections to destination tasks (read),
+ * or retrieves a saved pipe connection (write)
+ */
+@ThreadSafe
+public final class PipeManagerWorker {
+ private static final Logger LOG = LoggerFactory.getLogger(PipeManagerWorker.class.getName());
+
+ private final String executorId;
+ private final SerializerManager serializerManager;
+
+ // To-Executor connections
+ private final ByteTransfer byteTransfer;
+
+ // Thread-safe container
+ private final PipeContainer pipeContainer;
+
+ private final PersistentConnectionToMasterMap toMaster;
+
+ @Inject
+ private PipeManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
+ final ByteTransfer byteTransfer,
+ final SerializerManager serializerManager,
+ final PersistentConnectionToMasterMap toMaster) {
+ this.executorId = executorId;
+ this.byteTransfer = byteTransfer;
+ this.serializerManager = serializerManager;
+ this.pipeContainer = new PipeContainer();
+ this.toMaster = toMaster;
+ }
+
+ public CompletableFuture<DataUtil.IteratorWithNumBytes> read(final int srcTaskIndex,
+ final RuntimeEdge runtimeEdge,
+ final int dstTaskIndex) {
+ final String runtimeEdgeId = runtimeEdge.getId();
+ // Get the location of the src task (blocking call)
+ final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
+ .getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.RequestPipeLoc)
+ .setRequestPipeLocMsg(
+ ControlMessage.RequestPipeLocationMessage.newBuilder()
+ .setExecutorId(executorId)
+ .setRuntimeEdgeId(runtimeEdgeId)
+ .setSrcTaskIndex(srcTaskIndex)
+ .build())
+ .build());
+
+
+ return responseFromMasterFuture.thenCompose(responseFromMaster -> {
+ // Get executor id
+ if (responseFromMaster.getType() != ControlMessage.MessageType.PipeLocInfo) {
+ throw new RuntimeException("Response message type mismatch!");
+ }
+ final ControlMessage.PipeLocationInfoMessage pipeLocInfo = responseFromMaster.getPipeLocInfoMsg();
+ if (!pipeLocInfo.hasExecutorId()) {
+ throw new IllegalStateException();
+ }
+ final String targetExecutorId = responseFromMaster.getPipeLocInfoMsg().getExecutorId();
+
+ // Descriptor
+ final ControlMessage.PipeTransferContextDescriptor descriptor =
+ ControlMessage.PipeTransferContextDescriptor.newBuilder()
+ .setRuntimeEdgeId(runtimeEdgeId)
+ .setSrcTaskIndex(srcTaskIndex)
+ .setDstTaskIndex(dstTaskIndex)
+ .setNumPipeToWait(getNumOfPipeToWait(runtimeEdge))
+ .build();
+
+ // Connect to the executor
+ return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), true)
+ .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
+ serializerManager.getSerializer(runtimeEdgeId)));
+ });
+ }
+
+
+ public void notifyMaster(final String runtimeEdgeId, final long srcTaskIndex) {
+ // Notify the master that we're using this pipe.
+ toMaster.getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).send(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.PipeInit)
+ .setPipeInitMsg(ControlMessage.PipeInitMessage.newBuilder()
+ .setRuntimeEdgeId(runtimeEdgeId)
+ .setSrcTaskIndex(srcTaskIndex)
+ .setExecutorId(executorId)
+ .build())
+ .build());
+ }
+
+ /**
+ * (SYNCHRONIZATION) Called by task threads.
+ *
+ * @param runtimeEdge
+ * @param srcTaskIndex
+ * @return output contexts.
+ */
+ public List<ByteOutputContext> getOutputContexts(final RuntimeEdge runtimeEdge,
+ final long srcTaskIndex) {
+
+ // First, initialize the pair key
+ final Pair<String, Long> pairKey = Pair.of(runtimeEdge.getId(), srcTaskIndex);
+ pipeContainer.putPipeListIfAbsent(pairKey, getNumOfPipeToWait(runtimeEdge));
+
+ // Then, do stuff
+ return pipeContainer.getPipes(pairKey); // blocking call
+ }
+
+ public Serializer getSerializer(final String runtimeEdgeId) {
+ return serializerManager.getSerializer(runtimeEdgeId);
+ }
+
+ /**
+ * (SYNCHRONIZATION) Called by network threads.
+ *
+ * @param outputContext
+ * @throws InvalidProtocolBufferException
+ */
+ public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
+ final ControlMessage.PipeTransferContextDescriptor descriptor =
+ ControlMessage.PipeTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
+
+ final long srcTaskIndex = descriptor.getSrcTaskIndex();
+ final String runtimeEdgeId = descriptor.getRuntimeEdgeId();
+ final int dstTaskIndex = (int) descriptor.getDstTaskIndex();
+ final int numPipeToWait = (int) descriptor.getNumPipeToWait();
+ final Pair<String, Long> pairKey = Pair.of(runtimeEdgeId, srcTaskIndex);
+
+ // First, initialize the pair key
+ pipeContainer.putPipeListIfAbsent(pairKey, numPipeToWait);
+
+ // Then, do stuff
+ pipeContainer.putPipe(pairKey, dstTaskIndex, outputContext);
+ }
+
+ public void onInputContext(final ByteInputContext inputContext) throws InvalidProtocolBufferException {
+ throw new UnsupportedOperationException();
+ }
+
+ private int getNumOfPipeToWait(final RuntimeEdge runtimeEdge) {
+ final int dstParallelism = ((StageEdge) runtimeEdge).getDstIRVertex().getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new IllegalStateException());
+ final CommunicationPatternProperty.Value commPattern = ((StageEdge) runtimeEdge)
+ .getPropertyValue(CommunicationPatternProperty.class)
+ .orElseThrow(() -> new IllegalStateException());
+
+ return commPattern.equals(CommunicationPatternProperty.Value.OneToOne) ? 1 : dstParallelism;
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
similarity index 64%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
index 1bd661d..12c6053 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
@@ -18,62 +18,52 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.exception.BlockFetchException;
+import org.apache.nemo.common.exception.UnsupportedCommPatternException;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
-import org.apache.nemo.common.KeyRange;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.common.exception.BlockFetchException;
-import org.apache.nemo.common.exception.UnsupportedCommPatternException;
-import org.apache.nemo.common.HashRange;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.DataUtil;
import java.util.*;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
/**
* Represents the input data transfer to a task.
*/
-public final class InputReader extends DataTransfer {
- private final int dstTaskIndex;
+public final class BlockInputReader implements InputReader {
private final BlockManagerWorker blockManagerWorker;
+ private final int dstTaskIndex;
+
/**
* Attributes that specify how we should read the input.
*/
private final IRVertex srcVertex;
private final RuntimeEdge runtimeEdge;
- public InputReader(final int dstTaskIndex,
- final IRVertex srcVertex,
- final RuntimeEdge runtimeEdge,
- final BlockManagerWorker blockManagerWorker) {
- super(runtimeEdge.getId());
+ BlockInputReader(final int dstTaskIndex,
+ final IRVertex srcVertex,
+ final RuntimeEdge runtimeEdge,
+ final BlockManagerWorker blockManagerWorker) {
this.dstTaskIndex = dstTaskIndex;
this.srcVertex = srcVertex;
this.runtimeEdge = runtimeEdge;
this.blockManagerWorker = blockManagerWorker;
}
- /**
- * Reads input data depending on the communication pattern of the srcVertex.
- *
- * @return the read data.
- */
+ @Override
public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
final Optional<CommunicationPatternProperty.Value> comValue =
- runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
+ runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
if (comValue.get().equals(CommunicationPatternProperty.Value.OneToOne)) {
return Collections.singletonList(readOneToOne());
@@ -88,22 +78,43 @@ public final class InputReader extends DataTransfer {
}
}
+ @Override
+ public IRVertex getSrcIrVertex() {
+ return srcVertex;
+ }
+
+ /**
+ * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
+ * @param producerTaskIndex to use.
+ * @return wildcard block id that corresponds to "ANY" task attempt of the task index.
+ */
+ private String generateWildCardBlockId(final int producerTaskIndex) {
+ final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+ runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+ if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
+ return RuntimeIdManager.generateBlockIdWildcard(runtimeEdge.getId(), producerTaskIndex);
+ }
+ final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
+ return RuntimeIdManager.generateBlockIdWildcard(duplicateEdgeId, producerTaskIndex);
+ }
+
private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
final String blockIdWildcard = generateWildCardBlockId(dstTaskIndex);
final Optional<DataStoreProperty.Value> dataStoreProperty
- = runtimeEdge.getPropertyValue(DataStoreProperty.class);
- return blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all());
+ = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ return blockManagerWorker.readBlock(blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), HashRange.all());
}
private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
- final int numSrcTasks = this.getSourceParallelism();
+ final int numSrcTasks = InputReader.getSourceParallelism(this);
final Optional<DataStoreProperty.Value> dataStoreProperty
- = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ = runtimeEdge.getPropertyValue(DataStoreProperty.class);
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
- futures.add(blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all()));
+ futures.add(blockManagerWorker.readBlock(
+ blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), HashRange.all()));
}
return futures;
@@ -117,72 +128,22 @@ public final class InputReader extends DataTransfer {
private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
assert (runtimeEdge instanceof StageEdge);
final Optional<DataStoreProperty.Value> dataStoreProperty
- = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ = runtimeEdge.getPropertyValue(DataStoreProperty.class);
((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
final KeyRange hashRangeToRead = ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
if (hashRangeToRead == null) {
throw new BlockFetchException(
- new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
+ new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
}
- final int numSrcTasks = this.getSourceParallelism();
+ final int numSrcTasks = InputReader.getSourceParallelism(this);
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
futures.add(
- blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), hashRangeToRead));
+ blockManagerWorker.readBlock(blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), hashRangeToRead));
}
return futures;
}
-
- /**
- * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
- * @param producerTaskIndex to use.
- * @return wildcard block id that corresponds to "ANY" task attempt of the task index.
- */
- private String generateWildCardBlockId(final int producerTaskIndex) {
- final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
- if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
- return RuntimeIdManager.generateBlockIdWildcard(getId(), producerTaskIndex);
- }
- final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
- return RuntimeIdManager.generateBlockIdWildcard(duplicateEdgeId, producerTaskIndex);
- }
-
- public IRVertex getSrcIrVertex() {
- return srcVertex;
- }
-
- /**
- * Get the parallelism of the source task.
- *
- * @return the parallelism of the source task.
- */
- public int getSourceParallelism() {
- return srcVertex.getPropertyValue(ParallelismProperty.class).
- orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
- }
-
- /**
- * Combine the given list of futures.
- *
- * @param futures to combine.
- * @return the combined iterable of elements.
- * @throws ExecutionException when fail to get results from futures.
- * @throws InterruptedException when interrupted during getting results from futures.
- */
- @VisibleForTesting
- public static Iterator combineFutures(final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures)
- throws ExecutionException, InterruptedException {
- final List concatStreamBase = new ArrayList<>();
- Stream<Object> concatStream = concatStreamBase.stream();
- for (int srcTaskIdx = 0; srcTaskIdx < futures.size(); srcTaskIdx++) {
- final Iterator dataFromATask = futures.get(srcTaskIdx).get();
- final Iterable iterable = () -> dataFromATask;
- concatStream = Stream.concat(concatStream, StreamSupport.stream(iterable.spliterator(), false));
- }
- return concatStream.collect(Collectors.toList()).iterator();
- }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
similarity index 58%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index ebdbd1b..2391533 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -18,8 +18,6 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.exception.*;
import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -29,20 +27,23 @@ import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.block.Block;
import org.apache.nemo.runtime.executor.data.partitioner.*;
-import java.util.*;
+import java.util.Map;
+import java.util.Optional;
/**
* Represents the output data transfer from a task.
*/
-public final class OutputWriter extends DataTransfer implements AutoCloseable {
+public final class BlockOutputWriter implements OutputWriter {
private final RuntimeEdge<?> runtimeEdge;
private final IRVertex dstIrVertex;
+ private final Partitioner partitioner;
+
private final DataStoreProperty.Value blockStoreValue;
private final BlockManagerWorker blockManagerWorker;
- private final boolean nonDummyBlock;
private final Block blockToWrite;
+ private final boolean nonDummyBlock;
+
private long writtenBytes;
- private Partitioner partitioner;
/**
* Constructor.
@@ -53,46 +54,20 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
* @param runtimeEdge the {@link RuntimeEdge}.
* @param blockManagerWorker the {@link BlockManagerWorker}.
*/
- OutputWriter(final int hashRangeMultiplier,
- final String srcTaskId,
- final IRVertex dstIrVertex,
- final RuntimeEdge<?> runtimeEdge,
- final BlockManagerWorker blockManagerWorker) {
- super(runtimeEdge.getId());
+ BlockOutputWriter(final int hashRangeMultiplier,
+ final String srcTaskId,
+ final IRVertex dstIrVertex,
+ final RuntimeEdge<?> runtimeEdge,
+ final BlockManagerWorker blockManagerWorker) {
this.runtimeEdge = runtimeEdge;
this.dstIrVertex = dstIrVertex;
- this.blockManagerWorker = blockManagerWorker;
- this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
- orElseThrow(() -> new RuntimeException("No data store property on the edge"));
- // Setup partitioner
- final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
- orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
- final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
- final PartitionerProperty.Value partitionerPropertyValue =
- runtimeEdge.getPropertyValue(PartitionerProperty.class).
- orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
- switch (partitionerPropertyValue) {
- case IntactPartitioner:
- this.partitioner = new IntactPartitioner();
- break;
- case HashPartitioner:
- this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
- orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
- break;
- case DataSkewHashPartitioner:
- this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
- orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
- break;
- case DedicatedKeyPerElementPartitioner:
- this.partitioner = new DedicatedKeyPerElementPartitioner();
- break;
- default:
- throw new UnsupportedPartitionerException(
- new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
- }
+ this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+ this.blockManagerWorker = blockManagerWorker;
+ this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class)
+ .orElseThrow(() -> new RuntimeException("No data store property on the edge"));
blockToWrite = blockManagerWorker.createBlock(
- RuntimeIdManager.generateBlockId(getId(), srcTaskId), blockStoreValue);
+ RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), blockStoreValue);
final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
nonDummyBlock = !duplicateDataProperty.isPresent()
@@ -100,11 +75,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
|| duplicateDataProperty.get().getGroupSize() <= 1;
}
- /**
- * Writes output element depending on the communication pattern of the edge.
- *
- * @param element the element to write.
- */
+ @Override
public void write(final Object element) {
if (nonDummyBlock) {
blockToWrite.write(partitioner.partition(element), element);
@@ -121,11 +92,11 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
* Notifies that all writes for a block is end.
* Further write about a committed block will throw an exception.
*/
+ @Override
public void close() {
// Commit block.
- final DataPersistenceProperty.Value persistence =
- runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
- orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
+ final DataPersistenceProperty.Value persistence = (DataPersistenceProperty.Value) runtimeEdge
+ .getPropertyValue(DataPersistenceProperty.class).get();
final Optional<Map<Integer, Long>> partitionSizeMap = blockToWrite.commit();
// Return the total size of the committed block.
@@ -134,16 +105,13 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
for (final long partitionSize : partitionSizeMap.get().values()) {
blockSizeTotal += partitionSize;
}
- this.writtenBytes = blockSizeTotal;
+ writtenBytes = blockSizeTotal;
} else {
- this.writtenBytes = -1; // no written bytes info.
+ writtenBytes = -1; // no written bytes info.
}
blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, getExpectedRead(), persistence);
}
- /**
- * @return the total written bytes.
- */
public Optional<Long> getWrittenBytes() {
if (writtenBytes == -1) {
return Optional.empty();
@@ -160,14 +128,14 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
*/
private int getExpectedRead() {
final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+ runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
final int duplicatedDataMultiplier =
- duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+ duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
- runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
- () -> new RuntimeException("No communication pattern on this edge.")))
- ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
- () -> new RuntimeException("No parallelism property on the destination vertex."));
+ runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
+ () -> new RuntimeException("No communication pattern on this edge.")))
+ ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
+ () -> new RuntimeException("No parallelism property on the destination vertex."));
return readForABlock * duplicatedDataMultiplier;
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransfer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransfer.java
deleted file mode 100644
index dcaae52..0000000
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransfer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.runtime.executor.datatransfer;
-
-
-/**
- * Contains common parts involved in {@link InputReader} and {@link OutputWriter}.
- * The two classes are involved in
- * intermediate data transfer between {@link org.apache.nemo.runtime.common.plan.physical.Task}.
- */
-public abstract class DataTransfer {
- private final String id;
-
- public DataTransfer(final String id) {
- this.id = id;
- }
-
- /**
- * @return ID of the reader/writer.
- */
- public final String getId() {
- return id;
- }
-}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
index 1bd661d..e6f7657 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
@@ -18,171 +18,28 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
-import org.apache.nemo.common.KeyRange;
-import org.apache.nemo.runtime.common.plan.RuntimeEdge;
-import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.common.exception.BlockFetchException;
-import org.apache.nemo.common.exception.UnsupportedCommPatternException;
-import org.apache.nemo.common.HashRange;
-import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.DataUtil;
import java.util.*;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
/**
* Represents the input data transfer to a task.
*/
-public final class InputReader extends DataTransfer {
- private final int dstTaskIndex;
- private final BlockManagerWorker blockManagerWorker;
-
- /**
- * Attributes that specify how we should read the input.
- */
- private final IRVertex srcVertex;
- private final RuntimeEdge runtimeEdge;
-
- public InputReader(final int dstTaskIndex,
- final IRVertex srcVertex,
- final RuntimeEdge runtimeEdge,
- final BlockManagerWorker blockManagerWorker) {
- super(runtimeEdge.getId());
- this.dstTaskIndex = dstTaskIndex;
- this.srcVertex = srcVertex;
- this.runtimeEdge = runtimeEdge;
- this.blockManagerWorker = blockManagerWorker;
- }
-
+public interface InputReader {
/**
* Reads input data depending on the communication pattern of the srcVertex.
*
* @return the read data.
*/
- public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
- final Optional<CommunicationPatternProperty.Value> comValue =
- runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
-
- if (comValue.get().equals(CommunicationPatternProperty.Value.OneToOne)) {
- return Collections.singletonList(readOneToOne());
- } else if (comValue.get().equals(CommunicationPatternProperty.Value.BroadCast)) {
- return readBroadcast();
- } else if (comValue.get().equals(CommunicationPatternProperty.Value.Shuffle)) {
- // If the dynamic optimization which detects data skew is enabled, read the data in the assigned range.
- // TODO #492: Modularize the data communication pattern.
- return readDataInRange();
- } else {
- throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
- }
- }
-
- private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
- final String blockIdWildcard = generateWildCardBlockId(dstTaskIndex);
- final Optional<DataStoreProperty.Value> dataStoreProperty
- = runtimeEdge.getPropertyValue(DataStoreProperty.class);
- return blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all());
- }
-
- private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
- final int numSrcTasks = this.getSourceParallelism();
- final Optional<DataStoreProperty.Value> dataStoreProperty
- = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-
- final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
- for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
- final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
- futures.add(blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all()));
- }
-
- return futures;
- }
-
- /**
- * Read data in the assigned range of hash value.
- *
- * @return the list of the completable future of the data.
- */
- private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
- assert (runtimeEdge instanceof StageEdge);
- final Optional<DataStoreProperty.Value> dataStoreProperty
- = runtimeEdge.getPropertyValue(DataStoreProperty.class);
- ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
- final KeyRange hashRangeToRead = ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
- if (hashRangeToRead == null) {
- throw new BlockFetchException(
- new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
- }
-
- final int numSrcTasks = this.getSourceParallelism();
- final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
- for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
- final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
- futures.add(
- blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), hashRangeToRead));
- }
+ List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read();
- return futures;
- }
-
- /**
- * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
- * @param producerTaskIndex to use.
- * @return wildcard block id that corresponds to "ANY" task attempt of the task index.
- */
- private String generateWildCardBlockId(final int producerTaskIndex) {
- final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
- if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
- return RuntimeIdManager.generateBlockIdWildcard(getId(), producerTaskIndex);
- }
- final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
- return RuntimeIdManager.generateBlockIdWildcard(duplicateEdgeId, producerTaskIndex);
- }
-
- public IRVertex getSrcIrVertex() {
- return srcVertex;
- }
+ IRVertex getSrcIrVertex();
- /**
- * Get the parallelism of the source task.
- *
- * @return the parallelism of the source task.
- */
- public int getSourceParallelism() {
- return srcVertex.getPropertyValue(ParallelismProperty.class).
- orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
- }
-
- /**
- * Combine the given list of futures.
- *
- * @param futures to combine.
- * @return the combined iterable of elements.
- * @throws ExecutionException when fail to get results from futures.
- * @throws InterruptedException when interrupted during getting results from futures.
- */
- @VisibleForTesting
- public static Iterator combineFutures(final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures)
- throws ExecutionException, InterruptedException {
- final List concatStreamBase = new ArrayList<>();
- Stream<Object> concatStream = concatStreamBase.stream();
- for (int srcTaskIdx = 0; srcTaskIdx < futures.size(); srcTaskIdx++) {
- final Iterator dataFromATask = futures.get(srcTaskIdx).get();
- final Iterable iterable = () -> dataFromATask;
- concatStream = Stream.concat(concatStream, StreamSupport.stream(iterable.spliterator(), false));
- }
- return concatStream.collect(Collectors.toList()).iterator();
+ static int getSourceParallelism(final InputReader inputReader) {
+ return inputReader.getSrcIrVertex().getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new IllegalStateException(inputReader.getSrcIrVertex().getId()));
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
similarity index 61%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferFactory.java
rename to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
index 5f627ac..a90cc79 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
@@ -18,41 +18,51 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
+import java.util.Optional;
/**
* A factory that produces {@link InputReader} and {@link OutputWriter}.
*/
-public final class DataTransferFactory {
-
+public final class IntermediateDataIOFactory {
+ private final PipeManagerWorker pipeManagerWorker;
private final BlockManagerWorker blockManagerWorker;
private final int hashRangeMultiplier;
@Inject
- private DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
- final BlockManagerWorker blockManagerWorker) {
+ private IntermediateDataIOFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
+ final BlockManagerWorker blockManagerWorker,
+ final PipeManagerWorker pipeManagerWorker) {
this.hashRangeMultiplier = hashRangeMultiplier;
this.blockManagerWorker = blockManagerWorker;
+ this.pipeManagerWorker = pipeManagerWorker;
}
/**
* Creates an {@link OutputWriter} between two stages.
*
* @param srcTaskId the id of the source task.
- * @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
* @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
* @return the {@link OutputWriter} created.
*/
public OutputWriter createWriter(final String srcTaskId,
- final IRVertex dstIRVertex,
final RuntimeEdge<?> runtimeEdge) {
- return new OutputWriter(hashRangeMultiplier, srcTaskId, dstIRVertex, runtimeEdge, blockManagerWorker);
+ if (isPipe(runtimeEdge)) {
+ return new PipeOutputWriter(hashRangeMultiplier, srcTaskId, runtimeEdge, pipeManagerWorker);
+ } else {
+ final StageEdge stageEdge = (StageEdge) runtimeEdge;
+ return new BlockOutputWriter(
+ hashRangeMultiplier, srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
+ }
}
/**
@@ -66,6 +76,15 @@ public final class DataTransferFactory {
public InputReader createReader(final int dstTaskIdx,
final IRVertex srcIRVertex,
final RuntimeEdge runtimeEdge) {
- return new InputReader(dstTaskIdx, srcIRVertex, runtimeEdge, blockManagerWorker);
+ if (isPipe(runtimeEdge)) {
+ return new PipeInputReader(dstTaskIdx, srcIRVertex, runtimeEdge, pipeManagerWorker);
+ } else {
+ return new BlockInputReader(dstTaskIdx, srcIRVertex, runtimeEdge, blockManagerWorker);
+ }
+ }
+
+ private boolean isPipe(final RuntimeEdge runtimeEdge) {
+ final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+ return dataStoreProperty.isPresent() && dataStoreProperty.get().equals(DataStoreProperty.Value.Pipe);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index ebdbd1b..032510a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -19,14 +19,12 @@
package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.exception.*;
-import org.apache.nemo.common.ir.edge.executionproperty.*;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.exception.UnsupportedPartitionerException;
+import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
-import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
-import org.apache.nemo.runtime.executor.data.block.Block;
+import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.data.partitioner.*;
import java.util.*;
@@ -34,140 +32,52 @@ import java.util.*;
/**
* Represents the output data transfer from a task.
*/
-public final class OutputWriter extends DataTransfer implements AutoCloseable {
- private final RuntimeEdge<?> runtimeEdge;
- private final IRVertex dstIrVertex;
- private final DataStoreProperty.Value blockStoreValue;
- private final BlockManagerWorker blockManagerWorker;
- private final boolean nonDummyBlock;
- private final Block blockToWrite;
- private long writtenBytes;
- private Partitioner partitioner;
-
+public interface OutputWriter {
/**
- * Constructor.
+ * Writes output element depending on the communication pattern of the edge.
*
- * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
- * @param srcTaskId the id of the source task.
- * @param dstIrVertex the destination IR vertex.
- * @param runtimeEdge the {@link RuntimeEdge}.
- * @param blockManagerWorker the {@link BlockManagerWorker}.
+ * @param element the element to write.
*/
- OutputWriter(final int hashRangeMultiplier,
- final String srcTaskId,
- final IRVertex dstIrVertex,
- final RuntimeEdge<?> runtimeEdge,
- final BlockManagerWorker blockManagerWorker) {
- super(runtimeEdge.getId());
- this.runtimeEdge = runtimeEdge;
- this.dstIrVertex = dstIrVertex;
- this.blockManagerWorker = blockManagerWorker;
- this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
- orElseThrow(() -> new RuntimeException("No data store property on the edge"));
+ void write(final Object element);
+
+ /**
+ * @return the total written bytes.
+ */
+ Optional<Long> getWrittenBytes();
+
+ void close();
+
- // Setup partitioner
- final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
- orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
- final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
+ static Partitioner getPartitioner(final RuntimeEdge runtimeEdge,
+ final int hashRangeMultiplier) {
+ final StageEdge stageEdge = (StageEdge) runtimeEdge;
final PartitionerProperty.Value partitionerPropertyValue =
- runtimeEdge.getPropertyValue(PartitionerProperty.class).
- orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
+ (PartitionerProperty.Value) runtimeEdge.getPropertyValueOrRuntimeException(PartitionerProperty.class);
+ final int dstParallelism =
+ stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get();
+
+ final Partitioner partitioner;
switch (partitionerPropertyValue) {
case IntactPartitioner:
- this.partitioner = new IntactPartitioner();
+ partitioner = new IntactPartitioner();
break;
case HashPartitioner:
- this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
- orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
+ final KeyExtractor hashKeyExtractor =
+ (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
+ partitioner = new HashPartitioner(dstParallelism, hashKeyExtractor);
break;
case DataSkewHashPartitioner:
- this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
- orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
+ final KeyExtractor dataSkewKeyExtractor =
+ (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
+ partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, dataSkewKeyExtractor);
break;
case DedicatedKeyPerElementPartitioner:
- this.partitioner = new DedicatedKeyPerElementPartitioner();
+ partitioner = new DedicatedKeyPerElementPartitioner();
break;
default:
throw new UnsupportedPartitionerException(
- new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
+ new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
}
- blockToWrite = blockManagerWorker.createBlock(
- RuntimeIdManager.generateBlockId(getId(), srcTaskId), blockStoreValue);
- final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
- nonDummyBlock = !duplicateDataProperty.isPresent()
- || duplicateDataProperty.get().getRepresentativeEdgeId().equals(runtimeEdge.getId())
- || duplicateDataProperty.get().getGroupSize() <= 1;
- }
-
- /**
- * Writes output element depending on the communication pattern of the edge.
- *
- * @param element the element to write.
- */
- public void write(final Object element) {
- if (nonDummyBlock) {
- blockToWrite.write(partitioner.partition(element), element);
-
- final DedicatedKeyPerElement dedicatedKeyPerElement =
- partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class);
- if (dedicatedKeyPerElement != null) {
- blockToWrite.commitPartitions();
- }
- } // If else, does not need to write because the data is duplicated.
- }
-
- /**
- * Notifies that all writes for a block is end.
- * Further write about a committed block will throw an exception.
- */
- public void close() {
- // Commit block.
- final DataPersistenceProperty.Value persistence =
- runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
- orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
-
- final Optional<Map<Integer, Long>> partitionSizeMap = blockToWrite.commit();
- // Return the total size of the committed block.
- if (partitionSizeMap.isPresent()) {
- long blockSizeTotal = 0;
- for (final long partitionSize : partitionSizeMap.get().values()) {
- blockSizeTotal += partitionSize;
- }
- this.writtenBytes = blockSizeTotal;
- } else {
- this.writtenBytes = -1; // no written bytes info.
- }
- blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, getExpectedRead(), persistence);
- }
-
- /**
- * @return the total written bytes.
- */
- public Optional<Long> getWrittenBytes() {
- if (writtenBytes == -1) {
- return Optional.empty();
- } else {
- return Optional.of(writtenBytes);
- }
- }
-
- /**
- * Get the expected number of data read according to the communication pattern of the edge and
- * the parallelism of destination vertex.
- *
- * @return the expected number of data read.
- */
- private int getExpectedRead() {
- final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
- final int duplicatedDataMultiplier =
- duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
- final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
- runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
- () -> new RuntimeException("No communication pattern on this edge.")))
- ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
- () -> new RuntimeException("No parallelism property on the destination vertex."));
- return readForABlock * duplicatedDataMultiplier;
+ return partitioner;
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
new file mode 100644
index 0000000..eb3999b
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.exception.UnsupportedCommPatternException;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Represents the input data transfer to a task.
+ */
+public final class PipeInputReader implements InputReader {
+ private final PipeManagerWorker pipeManagerWorker;
+
+ private final int dstTaskIndex;
+
+ /**
+ * Attributes that specify how we should read the input.
+ */
+ private final IRVertex srcVertex;
+ private final RuntimeEdge runtimeEdge;
+
+ PipeInputReader(final int dstTaskIdx,
+ final IRVertex srcIRVertex,
+ final RuntimeEdge runtimeEdge,
+ final PipeManagerWorker pipeManagerWorker) {
+ this.dstTaskIndex = dstTaskIdx;
+ this.srcVertex = srcIRVertex;
+ this.runtimeEdge = runtimeEdge;
+ this.pipeManagerWorker = pipeManagerWorker;
+ }
+
+ @Override
+ public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
+ final Optional<CommunicationPatternProperty.Value> comValue =
+ runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
+
+ if (comValue.get().equals(CommunicationPatternProperty.Value.OneToOne)) {
+ return Collections.singletonList(pipeManagerWorker.read(dstTaskIndex, runtimeEdge, dstTaskIndex));
+ } else if (comValue.get().equals(CommunicationPatternProperty.Value.BroadCast)
+ || comValue.get().equals(CommunicationPatternProperty.Value.Shuffle)) {
+ final int numSrcTasks = InputReader.getSourceParallelism(this);
+ final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
+ for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
+ futures.add(pipeManagerWorker.read(srcTaskIdx, runtimeEdge, dstTaskIndex));
+ }
+ return futures;
+ } else {
+ throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
+ }
+ }
+
+ @Override
+ public IRVertex getSrcIrVertex() {
+ return srcVertex;
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
new file mode 100644
index 0000000..a5dbf93
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
+import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Represents the output data transfer from a task.
+ */
+public final class PipeOutputWriter implements OutputWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(OutputWriter.class.getName());
+
+ private final String srcTaskId;
+ private final PipeManagerWorker pipeManagerWorker;
+
+ private final Partitioner partitioner;
+ private final RuntimeEdge runtimeEdge;
+
+ private boolean initialized;
+ private Serializer serializer;
+ private List<ByteOutputContext> pipes;
+
+ /**
+ * Constructor.
+ *
+ * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
+ * @param srcTaskId the id of the source task.
+ * @param runtimeEdge the {@link RuntimeEdge}.
+ * @param pipeManagerWorker the pipe manager.
+ */
+ PipeOutputWriter(final int hashRangeMultiplier,
+ final String srcTaskId,
+ final RuntimeEdge runtimeEdge,
+ final PipeManagerWorker pipeManagerWorker) {
+ this.initialized = false;
+ this.srcTaskId = srcTaskId;
+ this.pipeManagerWorker = pipeManagerWorker;
+ this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId));
+ this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+ this.runtimeEdge = runtimeEdge;
+ }
+
+ /**
+ * Writes output element.
+ * @param element the element to write.
+ */
+ @Override
+ public void write(final Object element) {
+ if (!initialized) {
+ doInitialize();
+ }
+
+ try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = getPipeToWrite(element)) {
+ // Serialize (Do not compress)
+ final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
+ final OutputStream wrapped = DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+ final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
+ encoder.encode(element);
+ wrapped.close();
+
+ // Write
+ pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+ } catch (IOException e) {
+ throw new RuntimeException(e); // For now we crash the executor on IOException
+ }
+ }
+
+ @Override
+ public Optional<Long> getWrittenBytes() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void close() {
+ if (!initialized) {
+ // In order to "wire-up" with the receivers waiting for us.:w
+ doInitialize();
+ }
+
+ pipes.forEach(pipe -> {
+ try {
+ pipe.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private void doInitialize() {
+ initialized = true;
+
+ // Blocking call
+ this.pipes = pipeManagerWorker.getOutputContexts(runtimeEdge, RuntimeIdManager.getIndexFromTaskId(srcTaskId));
+ this.serializer = pipeManagerWorker.getSerializer(runtimeEdge.getId());
+ }
+
+ private ByteOutputContext.ByteOutputStream getPipeToWrite(final Object element) throws IOException {
+ return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
+ .get()
+ .equals(CommunicationPatternProperty.Value.OneToOne)
+ ? pipes.get(0).newOutputStream()
+ : pipes.get((int) partitioner.partition(element)).newOutputStream();
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 6098b30..e546ee7 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -69,6 +69,7 @@ class ParentTaskDataFetcher extends DataFetcher {
}
while (true) {
+
// This iterator has the element
if (this.currentIterator.hasNext()) {
return this.currentIterator.next();
@@ -84,6 +85,7 @@ class ParentTaskDataFetcher extends DataFetcher {
// We've consumed all the iterators
break;
}
+
}
} catch (final Throwable e) {
// Any failure is caught and thrown as an IOException, so that the task is retried.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 449542a..98600cd 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -90,14 +90,14 @@ public final class TaskExecutor {
* @param task Task with information needed during execution.
* @param irVertexDag A DAG of vertices.
* @param taskStateManager State manager for this Task.
- * @param dataTransferFactory For reading from/writing to data to other tasks.
+ * @param intermediateDataIOFactory For reading from/writing to data to other tasks.
* @param broadcastManagerWorker For broadcasts.
* @param metricMessageSender For sending metric with execution stats to Master.
*/
public TaskExecutor(final Task task,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
final TaskStateManager taskStateManager,
- final DataTransferFactory dataTransferFactory,
+ final IntermediateDataIOFactory intermediateDataIOFactory,
final BroadcastManagerWorker broadcastManagerWorker,
final MetricMessageSender metricMessageSender,
final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
@@ -117,7 +117,7 @@ public final class TaskExecutor {
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
// Prepare data structures
- final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, dataTransferFactory);
+ final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory);
this.nonBroadcastDataFetchers = pair.left();
this.sortedHarnesses = pair.right();
}
@@ -141,11 +141,13 @@ public final class TaskExecutor {
*
* @param task task.
* @param irVertexDag dag.
+ * @param intermediateDataIOFactory intermediate IO.
* @return fetchers and harnesses.
*/
- private Pair<List<DataFetcher>, List<VertexHarness>> prepare(final Task task,
- final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
- final DataTransferFactory dataTransferFactory) {
+ private Pair<List<DataFetcher>, List<VertexHarness>> prepare(
+ final Task task,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+ final IntermediateDataIOFactory intermediateDataIOFactory) {
final int taskIndex = RuntimeIdManager.getIndexFromTaskId(task.getTaskId());
// Traverse in a reverse-topological order to ensure that each visited vertex's children vertices exist.
@@ -164,12 +166,12 @@ public final class TaskExecutor {
final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
getInternalAdditionalOutputMap(irVertex, irVertexDag);
final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
- getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), dataTransferFactory);
+ getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
// Main outputs
final List<OperatorVertex> internalMainOutputs = getInternalMainOutputs(irVertex, irVertexDag);
final List<OutputWriter> externalMainOutputs =
- getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), dataTransferFactory);
+ getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
final OutputCollector outputCollector;
@@ -209,7 +211,7 @@ public final class TaskExecutor {
.filter(stageEdge -> stageEdge.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
.collect(Collectors.toList());
final List<InputReader> broadcastReaders =
- getParentTaskReaders(taskIndex, broadcastInEdges, dataTransferFactory);
+ getParentTaskReaders(taskIndex, broadcastInEdges, intermediateDataIOFactory);
if (broadcastInEdges.size() != broadcastReaders.size()) {
throw new IllegalStateException(broadcastInEdges.toString() + ", " + broadcastReaders.toString());
}
@@ -224,7 +226,7 @@ public final class TaskExecutor {
final List<StageEdge> nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex);
nonBroadcastInEdges.removeAll(broadcastInEdges);
final List<InputReader> nonBroadcastReaders =
- getParentTaskReaders(taskIndex, nonBroadcastInEdges, dataTransferFactory);
+ getParentTaskReaders(taskIndex, nonBroadcastInEdges, intermediateDataIOFactory);
nonBroadcastReaders.forEach(parentTaskReader -> nonBroadcastDataFetcherList.add(
new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
new DataFetcherOutputCollector((OperatorVertex) irVertex))));
@@ -456,7 +458,7 @@ public final class TaskExecutor {
private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap(
final IRVertex irVertex,
final List<StageEdge> outEdgesToChildrenTasks,
- final DataTransferFactory dataTransferFactory) {
+ final IntermediateDataIOFactory intermediateDataIOFactory) {
// Add all inter-task additional tags to additional output map.
final Map<String, List<OutputWriter>> map = new HashMap<>();
@@ -466,7 +468,7 @@ public final class TaskExecutor {
.filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
.map(edge ->
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
- dataTransferFactory.createWriter(taskId, edge.getDstIRVertex(), edge)))
+ intermediateDataIOFactory.createWriter(taskId, edge)))
.forEach(pair -> {
map.putIfAbsent(pair.left(), new ArrayList<>());
map.get(pair.left()).add(pair.right());
@@ -495,7 +497,7 @@ public final class TaskExecutor {
}
private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
- final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
.stream()
.filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
@@ -508,18 +510,18 @@ public final class TaskExecutor {
*
* @param irVertex source irVertex
* @param outEdgesToChildrenTasks outgoing edges to child tasks
- * @param dataTransferFactory dataTransferFactory
+ * @param intermediateDataIOFactory intermediateDataIOFactory
* @return OutputWriters for main children tasks
*/
private List<OutputWriter> getExternalMainOutputs(final IRVertex irVertex,
- final List<StageEdge> outEdgesToChildrenTasks,
- final DataTransferFactory dataTransferFactory) {
+ final List<StageEdge> outEdgesToChildrenTasks,
+ final IntermediateDataIOFactory intermediateDataIOFactory) {
return outEdgesToChildrenTasks
.stream()
.filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
.filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
- .map(outEdgeForThisVertex -> dataTransferFactory
- .createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
+ .map(outEdgeForThisVertex -> intermediateDataIOFactory
+ .createWriter(taskId, outEdgeForThisVertex))
.collect(Collectors.toList());
}
@@ -539,10 +541,10 @@ public final class TaskExecutor {
private List<InputReader> getParentTaskReaders(final int taskIndex,
final List<StageEdge> inEdgesFromParentTasks,
- final DataTransferFactory dataTransferFactory) {
+ final IntermediateDataIOFactory intermediateDataIOFactory) {
return inEdgesFromParentTasks
.stream()
- .map(inEdgeForThisVertex -> dataTransferFactory
+ .map(inEdgeForThisVertex -> intermediateDataIOFactory
.createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex))
.collect(Collectors.toList());
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 933a414..7befbe0 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -45,11 +45,13 @@ import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.Executor;
import org.apache.nemo.runtime.executor.TestUtil;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
+import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.master.*;
import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import org.apache.beam.sdk.values.KV;
import org.apache.commons.io.FileUtils;
+import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
+import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.io.network.naming.NameResolverConfiguration;
import org.apache.reef.io.network.naming.NameServer;
@@ -69,13 +71,15 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.io.IOException;
-import java.io.Serializable;
import java.util.*;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static org.apache.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
import static org.apache.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
@@ -83,6 +87,7 @@ import static org.apache.nemo.runtime.common.RuntimeTestUtil.flatten;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
/**
* Tests {@link InputReader} and {@link OutputWriter}.
@@ -116,7 +121,7 @@ public final class DataTransferTest {
private BlockManagerMaster master;
private BlockManagerWorker worker1;
- private DataTransferFactory transferFactory;
+ private IntermediateDataIOFactory transferFactory;
private BlockManagerWorker worker2;
private HashMap<BlockManagerWorker, SerializerManager> serializerManagers = new HashMap<>();
@@ -140,6 +145,7 @@ public final class DataTransferTest {
injector.bindVolatileParameter(JobConf.DAGDirectory.class, EMPTY_DAG_DIRECTORY);
// Necessary for wiring up the message environments
+ injector.bindVolatileInstance(Scheduler.class, injector.getInstance(BatchScheduler.class));
injector.getInstance(RuntimeMaster.class);
final BlockManagerMaster master = injector.getInstance(BlockManagerMaster.class);
@@ -147,7 +153,7 @@ public final class DataTransferTest {
nameClientInjector.bindVolatileParameter(JobConf.JobId.class, "data transfer test");
this.master = master;
- final Pair<BlockManagerWorker, DataTransferFactory> pair1 = createWorker(
+ final Pair<BlockManagerWorker, IntermediateDataIOFactory> pair1 = createWorker(
EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), dispatcherInjector, nameClientInjector);
this.worker1 = pair1.left();
this.transferFactory = pair1.right();
@@ -161,7 +167,7 @@ public final class DataTransferTest {
FileUtils.deleteDirectory(new File(TMP_REMOTE_FILE_DIRECTORY));
}
- private Pair<BlockManagerWorker, DataTransferFactory> createWorker(
+ private Pair<BlockManagerWorker, IntermediateDataIOFactory> createWorker(
final String executorId,
final Injector dispatcherInjector,
final Injector nameClientInjector) throws InjectionException {
@@ -180,12 +186,12 @@ public final class DataTransferTest {
injector.bindVolatileParameter(JobConf.GlusterVolumeDirectory.class, TMP_REMOTE_FILE_DIRECTORY);
final BlockManagerWorker blockManagerWorker;
final SerializerManager serializerManager;
- final DataTransferFactory dataTransferFactory;
+ final IntermediateDataIOFactory intermediateDataIOFactory;
try {
blockManagerWorker = injector.getInstance(BlockManagerWorker.class);
serializerManager = injector.getInstance(SerializerManager.class);
serializerManagers.put(blockManagerWorker, serializerManager);
- dataTransferFactory = injector.getInstance(DataTransferFactory.class);
+ intermediateDataIOFactory = injector.getInstance(IntermediateDataIOFactory.class);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
@@ -193,7 +199,7 @@ public final class DataTransferTest {
// Unused, but necessary for wiring up the message environments
injector.getInstance(Executor.class);
- return Pair.of(blockManagerWorker, dataTransferFactory);
+ return Pair.of(blockManagerWorker, intermediateDataIOFactory);
}
private Injector createNameClientInjector() {
@@ -320,11 +326,9 @@ public final class DataTransferTest {
final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
final RuntimeEdge dummyEdge;
- final IRVertex srcMockVertex = mock(IRVertex.class);
- final IRVertex dstMockVertex = mock(IRVertex.class);
final Stage srcStage = setupStages("srcStage" + testIndex);
final Stage dstStage = setupStages("dstStage" + testIndex);
- dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex, srcStage, dstStage);
+ dummyEdge = new StageEdge(edgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage);
// Initialize states in Master
TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
@@ -336,7 +340,7 @@ public final class DataTransferTest {
final List<List> dataWrittenList = new ArrayList<>();
TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
- final OutputWriter writer = transferFactory.createWriter(srcTaskId, dstVertex, dummyEdge);
+ final OutputWriter writer = transferFactory.createWriter(srcTaskId, dummyEdge);
dataWritten.iterator().forEachRemaining(writer::write);
writer.close();
dataWrittenList.add(dataWritten);
@@ -346,13 +350,13 @@ public final class DataTransferTest {
final List<List> dataReadList = new ArrayList<>();
IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
final InputReader reader =
- new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+ new BlockInputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
- assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
+ assertEquals(PARALLELISM_TEN, InputReader.getSourceParallelism(reader));
final List dataRead = new ArrayList<>();
try {
- InputReader.combineFutures(reader.read()).forEachRemaining(dataRead::add);
+ combineFutures(reader.read()).forEachRemaining(dataRead::add);
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -411,13 +415,10 @@ public final class DataTransferTest {
final RuntimeEdge dummyEdge, dummyEdge2;
final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
- final IRVertex srcMockVertex = mock(IRVertex.class);
- final IRVertex dstMockVertex = mock(IRVertex.class);
final Stage srcStage = setupStages("srcStage" + testIndex);
final Stage dstStage = setupStages("dstStage" + testIndex);
- dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex, srcStage, dstStage);
- final IRVertex dstMockVertex2 = mock(IRVertex.class);
- dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2, srcStage, dstStage);
+ dummyEdge = new StageEdge(edgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage);
+ dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcVertex, dstVertex, srcStage, dstStage);
// Initialize states in Master
TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
final String blockId = RuntimeIdManager.generateBlockId(edgeId, srcTaskId);
@@ -429,12 +430,12 @@ public final class DataTransferTest {
final List<List> dataWrittenList = new ArrayList<>();
TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
- final OutputWriter writer = transferFactory.createWriter(srcTaskId, dstVertex, dummyEdge);
+ final OutputWriter writer = transferFactory.createWriter(srcTaskId, dummyEdge);
dataWritten.iterator().forEachRemaining(writer::write);
writer.close();
dataWrittenList.add(dataWritten);
- final OutputWriter writer2 = transferFactory.createWriter(srcTaskId, dstVertex, dummyEdge2);
+ final OutputWriter writer2 = transferFactory.createWriter(srcTaskId, dummyEdge2);
dataWritten.iterator().forEachRemaining(writer2::write);
writer2.close();
});
@@ -444,17 +445,17 @@ public final class DataTransferTest {
final List<List> dataReadList2 = new ArrayList<>();
IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
final InputReader reader =
- new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+ new BlockInputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
final InputReader reader2 =
- new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
+ new BlockInputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
- assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
+ assertEquals(PARALLELISM_TEN, InputReader.getSourceParallelism(reader));
- assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
+ assertEquals(PARALLELISM_TEN, InputReader.getSourceParallelism(reader));
final List dataRead = new ArrayList<>();
try {
- InputReader.combineFutures(reader.read()).forEachRemaining(dataRead::add);
+ combineFutures(reader.read()).forEachRemaining(dataRead::add);
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -462,7 +463,7 @@ public final class DataTransferTest {
final List dataRead2 = new ArrayList<>();
try {
- InputReader.combineFutures(reader2.read()).forEachRemaining(dataRead2::add);
+ combineFutures(reader2.read()).forEachRemaining(dataRead2::add);
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -534,5 +535,25 @@ public final class DataTransferTest {
stageExecutionProperty.put(ScheduleGroupProperty.of(0));
return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
}
+
+ /**
+ * Combine the given list of futures.
+ *
+ * @param futures to combine.
+ * @return the combined iterable of elements.
+ * @throws ExecutionException when fail to get results from futures.
+ * @throws InterruptedException when interrupted during getting results from futures.
+ */
+ private Iterator combineFutures(final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures)
+ throws ExecutionException, InterruptedException {
+ final List concatStreamBase = new ArrayList<>();
+ Stream<Object> concatStream = concatStreamBase.stream();
+ for (int srcTaskIdx = 0; srcTaskIdx < futures.size(); srcTaskIdx++) {
+ final Iterator dataFromATask = futures.get(srcTaskIdx).get();
+ final Iterable iterable = () -> dataFromATask;
+ concatStream = Stream.concat(concatStream, StreamSupport.stream(iterable.spliterator(), false));
+ }
+ return concatStream.collect(Collectors.toList()).iterator();
+ }
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 705f833..6cbf694 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -22,9 +22,11 @@ import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.datatransfer.BlockInputReader;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -42,7 +44,7 @@ import static org.mockito.Mockito.when;
* Tests {@link ParentTaskDataFetcher}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({InputReader.class, VertexHarness.class})
+@PrepareForTest({InputReader.class, VertexHarness.class, BlockInputReader.class})
public final class ParentTaskDataFetcherTest {
@Test(timeout=5000)
@@ -125,7 +127,7 @@ public final class ParentTaskDataFetcherTest {
}
private InputReader generateInputReader(final CompletableFuture completableFuture) {
- final InputReader inputReader = mock(InputReader.class);
+ final InputReader inputReader = mock(InputReader.class, Mockito.CALLS_REAL_METHODS);
when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
return inputReader;
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index e0c4f95..b7da5fa 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -33,6 +33,7 @@ import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
import org.apache.nemo.common.ir.vertex.InMemorySourceVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -47,9 +48,7 @@ import org.apache.nemo.runtime.executor.MetricMessageSender;
import org.apache.nemo.runtime.executor.TaskStateManager;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.data.DataUtil;
-import org.apache.nemo.runtime.executor.datatransfer.DataTransferFactory;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
-import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
+import org.apache.nemo.runtime.executor.datatransfer.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -78,8 +77,8 @@ import static org.mockito.Mockito.*;
* Tests {@link TaskExecutor}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class, BroadcastManagerWorker.class,
- TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class, Stage.class, IREdge.class})
+@PrepareForTest({InputReader.class, OutputWriter.class, IntermediateDataIOFactory.class, BroadcastManagerWorker.class,
+ TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class, Stage.class, IREdge.class})
public final class TaskExecutorTest {
private static final AtomicInteger RUNTIME_EDGE_ID = new AtomicInteger(0);
private static final int DATA_SIZE = 100;
@@ -90,7 +89,7 @@ public final class TaskExecutorTest {
private List<Integer> elements;
private Map<String, List> runtimeEdgeToOutputData;
- private DataTransferFactory dataTransferFactory;
+ private IntermediateDataIOFactory intermediateDataIOFactory;
private BroadcastManagerWorker broadcastManagerWorker;
private TaskStateManager taskStateManager;
private MetricMessageSender metricMessageSender;
@@ -110,11 +109,11 @@ public final class TaskExecutorTest {
// Mock a TaskStateManager. It accumulates the state change into a list.
taskStateManager = mock(TaskStateManager.class);
- // Mock a DataTransferFactory.
+ // Mock a IntermediateDataIOFactory.
runtimeEdgeToOutputData = new HashMap<>();
- dataTransferFactory = mock(DataTransferFactory.class);
- when(dataTransferFactory.createReader(anyInt(), any(), any())).then(new ParentTaskReaderAnswer());
- when(dataTransferFactory.createWriter(any(), any(), any())).then(new ChildTaskWriterAnswer());
+ intermediateDataIOFactory = mock(IntermediateDataIOFactory.class);
+ when(intermediateDataIOFactory.createReader(anyInt(), any(), any())).then(new ParentTaskReaderAnswer());
+ when(intermediateDataIOFactory.createWriter(any(), any())).then(new ChildTaskWriterAnswer());
// Mock a MetricMessageSender.
metricMessageSender = mock(MetricMessageSender.class);
@@ -532,9 +531,9 @@ public final class TaskExecutorTest {
}
final InputReader inputReader = mock(InputReader.class);
final IRVertex srcVertex = (IRVertex) invocationOnMock.getArgument(1);
+ srcVertex.setProperty(ParallelismProperty.of(SOURCE_PARALLELISM));
when(inputReader.getSrcIrVertex()).thenReturn(srcVertex);
when(inputReader.read()).thenReturn(inputFutures);
- when(inputReader.getSourceParallelism()).thenReturn(SOURCE_PARALLELISM);
return inputReader;
}
}
@@ -547,7 +546,7 @@ public final class TaskExecutorTest {
@Override
public OutputWriter answer(final InvocationOnMock invocationOnMock) throws Throwable {
final Object[] args = invocationOnMock.getArguments();
- final RuntimeEdge runtimeEdge = (RuntimeEdge) args[2];
+ final RuntimeEdge runtimeEdge = (RuntimeEdge) args[1];
final OutputWriter outputWriter = mock(OutputWriter.class);
doAnswer(new Answer() {
@Override
@@ -689,7 +688,7 @@ public final class TaskExecutorTest {
}
private TaskExecutor getTaskExecutor(final Task task, final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag) {
- return new TaskExecutor(task, taskDag, taskStateManager, dataTransferFactory, broadcastManagerWorker,
+ return new TaskExecutor(task, taskDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
metricMessageSender, persistentConnectionToMasterMap);
}
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PipeManagerMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PipeManagerMaster.java
new file mode 100644
index 0000000..f2c5d18
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PipeManagerMaster.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.master;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.exception.IllegalMessageException;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageContext;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.MessageListener;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Master-side pipe manager.
+ */
+@ThreadSafe
+@DriverSide
+public final class PipeManagerMaster {
+ private static final Logger LOG = LoggerFactory.getLogger(PipeManagerMaster.class.getName());
+ private final Map<Pair<String, Long>, String> runtimeEdgeSrcIndexToExecutor;
+ private final Map<Pair<String, Long>, Lock> runtimeEdgeSrcIndexToLock;
+ private final Map<Pair<String, Long>, Condition> runtimeEdgeSrcIndexToCondition;
+ private final ExecutorService waitForPipe;
+
+ /**
+ * Constructor.
+ * @param masterMessageEnvironment the message environment.
+ */
+ @Inject
+ private PipeManagerMaster(final MessageEnvironment masterMessageEnvironment) {
+ masterMessageEnvironment.setupListener(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID,
+ new PipeManagerMasterControlMessageReceiver());
+ this.runtimeEdgeSrcIndexToExecutor = new ConcurrentHashMap<>();
+ this.runtimeEdgeSrcIndexToLock = new ConcurrentHashMap<>();
+ this.runtimeEdgeSrcIndexToCondition = new ConcurrentHashMap<>();
+ this.waitForPipe = Executors.newCachedThreadPool();
+ }
+
+ public void onTaskScheduled(final String edgeId, final long srcIndex) {
+ final Pair<String, Long> keyPair = Pair.of(edgeId, srcIndex);
+ if (null != runtimeEdgeSrcIndexToLock.put(keyPair, new ReentrantLock())) {
+ throw new IllegalStateException(keyPair.toString());
+ }
+ if (null != runtimeEdgeSrcIndexToCondition.put(keyPair, runtimeEdgeSrcIndexToLock.get(keyPair).newCondition())) {
+ throw new IllegalStateException(keyPair.toString());
+ }
+ }
+
+ /**
+ * Handler for control messages received.
+ */
+ public final class PipeManagerMasterControlMessageReceiver implements MessageListener<ControlMessage.Message> {
+ @Override
+ public void onMessage(final ControlMessage.Message message) {
+ switch (message.getType()) {
+ case PipeInit:
+ final ControlMessage.PipeInitMessage pipeInitMessage = message.getPipeInitMsg();
+ final Pair<String, Long> keyPair =
+ Pair.of(pipeInitMessage.getRuntimeEdgeId(), pipeInitMessage.getSrcTaskIndex());
+
+ // Allow to put at most once
+ final Lock lock = runtimeEdgeSrcIndexToLock.get(keyPair);
+ lock.lock();
+ try {
+ if (null != runtimeEdgeSrcIndexToExecutor.put(keyPair, pipeInitMessage.getExecutorId())) {
+ throw new RuntimeException(keyPair.toString());
+ }
+ runtimeEdgeSrcIndexToCondition.get(keyPair).signalAll();
+ } finally {
+ lock.unlock();
+ }
+
+ break;
+ default:
+ throw new IllegalMessageException(new Exception(message.toString()));
+ }
+
+
+ }
+
+ @Override
+ public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
+ switch (message.getType()) {
+ case RequestPipeLoc:
+ final ControlMessage.RequestPipeLocationMessage pipeLocRequest = message.getRequestPipeLocMsg();
+
+ // Use the executor service to avoid blocking the networking thread.
+ waitForPipe.submit(() -> {
+ final Pair<String, Long> keyPair =
+ Pair.of(pipeLocRequest.getRuntimeEdgeId(), pipeLocRequest.getSrcTaskIndex());
+
+ final Lock lock = runtimeEdgeSrcIndexToLock.get(keyPair);
+ lock.lock();
+ try {
+ if (!runtimeEdgeSrcIndexToExecutor.containsKey(keyPair)) {
+ runtimeEdgeSrcIndexToCondition.get(keyPair).await();
+ }
+
+ final String location = runtimeEdgeSrcIndexToExecutor.get(keyPair);
+ if (location == null) {
+ throw new IllegalStateException(keyPair.toString());
+ }
+
+ // Reply the location
+ messageContext.reply(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.PipeLocInfo)
+ .setPipeLocInfoMsg(ControlMessage.PipeLocationInfoMessage.newBuilder()
+ .setRequestId(message.getId())
+ .setExecutorId(location)
+ .build())
+ .build());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ });
+
+ break;
+ default:
+ throw new IllegalMessageException(new Exception(message.toString()));
+ }
+ }
+ }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java
index 94289e8..b8d00b3 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java
@@ -22,7 +22,6 @@ import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.DefaultImplementation;
import javax.annotation.Nullable;
@@ -33,7 +32,6 @@ import javax.annotation.Nullable;
* Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
*/
@DriverSide
-@DefaultImplementation(BatchScheduler.class)
public interface Scheduler {
/**
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
index 48f5e8c..71f1da2 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
@@ -18,7 +18,6 @@
*/
package org.apache.nemo.runtime.master.scheduler;
-import com.google.common.collect.Lists;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
+import javax.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
/**
* A simple scheduler for streaming workloads.
* - Keeps track of new executors
- * - Schedules all tasks in a reverse topological order.
+ * - Schedules all tasks in the plan at once.
* - Crashes the system upon any other events (should be fixed in the future)
* - Never stops running.
*/
@@ -55,15 +55,19 @@ public final class StreamingScheduler implements Scheduler {
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorRegistry executorRegistry;
private final PlanStateManager planStateManager;
+ private final PipeManagerMaster pipeManagerMaster;
+ @Inject
StreamingScheduler(final TaskDispatcher taskDispatcher,
final PendingTaskCollectionPointer pendingTaskCollectionPointer,
final ExecutorRegistry executorRegistry,
- final PlanStateManager planStateManager) {
+ final PlanStateManager planStateManager,
+ final PipeManagerMaster pipeManagerMaster) {
this.taskDispatcher = taskDispatcher;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.executorRegistry = executorRegistry;
this.planStateManager = planStateManager;
+ this.pipeManagerMaster = pipeManagerMaster;
}
@Override
@@ -75,8 +79,8 @@ public final class StreamingScheduler implements Scheduler {
planStateManager.storeJSON("submitted");
// Prepare tasks
- final List<Stage> reverseTopoStages = Lists.reverse(submittedPhysicalPlan.getStageDAG().getTopologicalSort());
- final List<Task> reverseTopoTasks = reverseTopoStages.stream().flatMap(stageToSchedule -> {
+ final List<Stage> allStages = submittedPhysicalPlan.getStageDAG().getTopologicalSort();
+ final List<Task> allTasks = allStages.stream().flatMap(stageToSchedule -> {
// Helper variables for this stage
final List<StageEdge> stageIncomingEdges =
submittedPhysicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
@@ -85,6 +89,11 @@ public final class StreamingScheduler implements Scheduler {
final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
+ taskIdsToSchedule.forEach(taskId -> {
+ final int index = RuntimeIdManager.getIndexFromTaskId(taskId);
+ stageOutgoingEdges.forEach(outEdge -> pipeManagerMaster.onTaskScheduled(outEdge.getId(), index));
+ });
+
// Create tasks of this stage
return taskIdsToSchedule.stream().map(taskId -> new Task(
submittedPhysicalPlan.getPlanId(),
@@ -97,7 +106,8 @@ public final class StreamingScheduler implements Scheduler {
}).collect(Collectors.toList());
// Schedule everything at once
- pendingTaskCollectionPointer.setToOverwrite(reverseTopoTasks);
+ pendingTaskCollectionPointer.setToOverwrite(allTasks);
+ taskDispatcher.onNewPendingTaskCollectionAvailable();
}
@Override
@@ -113,11 +123,15 @@ public final class StreamingScheduler implements Scheduler {
final TaskState.State newState,
@Nullable final String vertexPutOnHold,
final TaskState.RecoverableTaskFailureCause failureCause) {
+ planStateManager.onTaskStateChanged(taskId, newState);
+
switch (newState) {
case COMPLETE:
- case SHOULD_RETRY:
+ // Do nothing.
+ break;
case ON_HOLD:
case FAILED:
+ case SHOULD_RETRY:
// TODO #226: StreamingScheduler Fault Tolerance
throw new UnsupportedOperationException();
case READY:
@@ -137,6 +151,7 @@ public final class StreamingScheduler implements Scheduler {
@Override
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
+ taskDispatcher.onExecutorSlotAvailable();
executorRegistry.registerExecutor(executorRepresenter);
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
index fa0caf5..c50f1e9 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -50,7 +50,7 @@ import javax.inject.Inject;
final class TaskDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName());
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
- private final ExecutorService schedulerThread;
+ private final ExecutorService dispatcherThread;
private final PlanStateManager planStateManager;
private boolean isSchedulerRunning;
private boolean isTerminated;
@@ -67,7 +67,7 @@ final class TaskDispatcher {
final ExecutorRegistry executorRegistry,
final PlanStateManager planStateManager) {
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
- this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
+ this.dispatcherThread = Executors.newSingleThreadExecutor(runnable ->
new Thread(runnable, "TaskDispatcher thread"));
this.planStateManager = planStateManager;
this.isSchedulerRunning = false;
@@ -81,7 +81,7 @@ final class TaskDispatcher {
* A separate thread is run to dispatch tasks to executors.
* See comments in the {@link Scheduler} for avoiding race conditions.
*/
- private final class SchedulerThread implements Runnable {
+ private final class TaskDispatcherThread implements Runnable {
@Override
public void run() {
while (!isTerminated) {
@@ -167,8 +167,8 @@ final class TaskDispatcher {
*/
void run() {
if (!isTerminated && !isSchedulerRunning) {
- schedulerThread.execute(new SchedulerThread());
- schedulerThread.shutdown();
+ dispatcherThread.execute(new TaskDispatcherThread());
+ dispatcherThread.shutdown();
isSchedulerRunning = true;
}
}
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
index 56e69eb..34a1cff 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
@@ -23,6 +23,7 @@ import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PipeManagerMaster;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
import org.junit.Before;
@@ -46,7 +47,8 @@ import static org.mockito.Mockito.when;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({BlockManagerMaster.class, PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class,
- TaskDispatcher.class, PendingTaskCollectionPointer.class, ExecutorRegistry.class, PlanStateManager.class})
+ TaskDispatcher.class, PendingTaskCollectionPointer.class, ExecutorRegistry.class, PlanStateManager.class,
+ PipeManagerMaster.class})
public final class StreamingSchedulerTest {
private static final int ATTEMPTS_PER_STAGE = 2;
@@ -60,13 +62,15 @@ public final class StreamingSchedulerTest {
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
final ExecutorRegistry executorRegistry = mock(ExecutorRegistry.class);
final PlanStateManager planStateManager = mock(PlanStateManager.class);
+ final PipeManagerMaster pipeManagerMaster = mock(PipeManagerMaster.class);
when(planStateManager.getTaskAttemptsToSchedule(any())).thenAnswer(invocationOnMock -> {
final String stageId = invocationOnMock.getArgument(0);
return generateAttempts(stageId);
});
- scheduler = new StreamingScheduler(taskDispatcher, pendingTaskCollectionPointer, executorRegistry, planStateManager);
+ scheduler = new StreamingScheduler(
+ taskDispatcher, pendingTaskCollectionPointer, executorRegistry, planStateManager, pipeManagerMaster);
}
private List<String> generateAttempts(final String stageId) {
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index 11022b1..7af0cbe 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -246,7 +246,7 @@ public final class TaskRetryTest {
injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
planStateManager = injector.getInstance(PlanStateManager.class);
- scheduler = injector.getInstance(Scheduler.class);
+ scheduler = injector.getInstance(BatchScheduler.class);
blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);