You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/28 06:38:21 UTC
[incubator-nemo] branch master updated: [Nemo-238] Optimize
processElement in TaskExecutor (#131)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 5c03023 [Nemo-238] Optimize processElement in TaskExecutor (#131)
5c03023 is described below
commit 5c030232e7ba1b98c4ad0cfad36f3a8125da6f88
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Sun Oct 28 15:38:17 2018 +0900
[Nemo-238] Optimize processElement in TaskExecutor (#131)
JIRA: [NEMO-238: Optimize processElement in TaskExecutor](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-238)
**Major changes:**
- Refactor `processElement` by creating `DataFetcherOutputCollector`
**Minor changes to note:**
-
**Tests for the changes:**
-
**Other comments:**
-
Closes #131
---
.../nemo/examples/beam/PerKeyMedianITCase.java | 2 +-
.../datatransfer/DataFetcherOutputCollector.java | 50 ++++++++++++++++++++++
...mpl.java => OperatorVertexOutputCollector.java} | 14 +++---
.../nemo/runtime/executor/task/DataFetcher.java | 15 +++----
.../executor/task/ParentTaskDataFetcher.java | 6 ++-
.../executor/task/SourceVertexDataFetcher.java | 5 ++-
.../nemo/runtime/executor/task/TaskExecutor.java | 29 ++++---------
.../executor/task/ParentTaskDataFetcherTest.java | 3 +-
8 files changed, 81 insertions(+), 43 deletions(-)
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
index 37f46ca..41fc6c6 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
@@ -35,7 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
public final class PerKeyMedianITCase {
- private static final int TIMEOUT = 60 * 1000;
+ private static final int TIMEOUT = 120 * 1000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
new file mode 100644
index 0000000..0a64042
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This collector receives data from DataFetcher and forwards it to the next operator.
+ * @param <O> output type.
+ */
+public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
+ private final OperatorVertex nextOperatorVertex;
+
+ /**
+ * It forwards output to the next operator.
+ */
+ public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
+ this.nextOperatorVertex = nextOperatorVertex;
+ }
+
+ @Override
+ public void emit(final O output) {
+ nextOperatorVertex.getTransform().onData(output);
+ }
+
+ @Override
+ public <T> void emit(final String dstVertexId, final T output) {
+ throw new RuntimeException("No additional output tag in DataFetcherOutputCollector");
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
similarity index 84%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
rename to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 4a56ff0..715fde8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -36,8 +36,8 @@ import java.util.*;
*
* @param <O> output type.
*/
-public final class OutputCollectorImpl<O> implements OutputCollector<O> {
- private static final Logger LOG = LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
+public final class OperatorVertexOutputCollector<O> implements OutputCollector<O> {
+ private static final Logger LOG = LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
private final IRVertex irVertex;
private final List<OperatorVertex> internalMainOutputs;
@@ -53,11 +53,11 @@ public final class OutputCollectorImpl<O> implements OutputCollector<O> {
* @param externalMainOutputs external main outputs
* @param externalAdditionalOutputs external additional outputs
*/
- public OutputCollectorImpl(final IRVertex irVertex,
- final List<OperatorVertex> internalMainOutputs,
- final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
- final List<OutputWriter> externalMainOutputs,
- final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
+ public OperatorVertexOutputCollector(final IRVertex irVertex,
+ final List<OperatorVertex> internalMainOutputs,
+ final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
+ final List<OutputWriter> externalMainOutputs,
+ final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
this.irVertex = irVertex;
this.internalMainOutputs = internalMainOutputs;
this.internalAdditionalOutputs = internalAdditionalOutputs;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index e3b3e43..5b4777c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.task;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import java.io.IOException;
@@ -27,12 +28,12 @@ import java.io.IOException;
*/
abstract class DataFetcher {
private final IRVertex dataSource;
- private final VertexHarness child;
+ private final OutputCollector outputCollector;
DataFetcher(final IRVertex dataSource,
- final VertexHarness child) {
+ final OutputCollector outputCollector) {
this.dataSource = dataSource;
- this.child = child;
+ this.outputCollector = outputCollector;
}
/**
@@ -43,11 +44,7 @@ abstract class DataFetcher {
*/
abstract Object fetchDataElement() throws IOException;
- VertexHarness getChild() {
- return child;
- }
-
- public IRVertex getDataSource() {
- return dataSource;
+ OutputCollector getOutputCollector() {
+ return outputCollector;
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index d1b4ccb..70680cc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.task;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
@@ -49,8 +50,9 @@ class ParentTaskDataFetcher extends DataFetcher {
private long serBytes = 0;
private long encodedBytes = 0;
- ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask, final VertexHarness child) {
- super(dataSource, child);
+ ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask,
+ final OutputCollector outputCollector) {
+ super(dataSource, outputCollector);
this.readersForParentTask = readerForParentTask;
this.firstFetch = true;
this.currentIteratorIndex = 0;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index dc42642..505d246 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.task;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.IRVertex;
@@ -37,8 +38,8 @@ class SourceVertexDataFetcher extends DataFetcher {
SourceVertexDataFetcher(final IRVertex dataSource,
final Readable readable,
- final VertexHarness child) {
- super(dataSource, child);
+ final OutputCollector outputCollector) {
+ super(dataSource, outputCollector);
this.readable = readable;
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index e920014..fac3e76 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
+import org.apache.nemo.runtime.executor.datatransfer.OperatorVertexOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,7 +177,7 @@ public final class TaskExecutor {
outputCollector = new DynOptDataOutputCollector(
irVertex, persistentConnectionToMasterMap, this);
} else {
- outputCollector = new OutputCollectorImpl(
+ outputCollector = new OperatorVertexOutputCollector(
irVertex, internalMainOutputs, internalAdditionalOutputMap,
externalMainOutputs, externalAdditionalOutputMap);
}
@@ -194,7 +194,7 @@ public final class TaskExecutor {
// Source read
if (irVertex instanceof SourceVertex) {
// Source vertex read
- nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), vertexHarness));
+ nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), outputCollector));
}
// Parent-task read (broadcasts)
@@ -224,7 +224,8 @@ public final class TaskExecutor {
final List<InputReader> nonBroadcastReaders =
getParentTaskReaders(taskIndex, nonBroadcastInEdges, dataTransferFactory);
nonBroadcastReaders.forEach(parentTaskReader -> nonBroadcastDataFetcherList.add(
- new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader, vertexHarness)));
+ new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+ new DataFetcherOutputCollector((OperatorVertex) irVertex))));
});
final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -237,23 +238,9 @@ public final class TaskExecutor {
/**
* Process a data element down the DAG dependency.
- *
- * @param vertexHarness VertexHarness of a vertex to execute.
- * @param dataElement input data element to process.
*/
- private void processElement(final VertexHarness vertexHarness, final Object dataElement) {
- final IRVertex irVertex = vertexHarness.getIRVertex();
- final OutputCollector outputCollector = vertexHarness.getOutputCollector();
-
- // TODO #XXX: optimize processElement (do not check instanceof for each data element)
- if (irVertex instanceof SourceVertex) {
- outputCollector.emit(dataElement);
- } else if (irVertex instanceof OperatorVertex) {
- final Transform transform = ((OperatorVertex) irVertex).getTransform();
- transform.onData(dataElement);
- } else {
- throw new UnsupportedOperationException("This type of IRVertex is not supported");
- }
+ private void processElement(final OutputCollector outputCollector, final Object dataElement) {
+ outputCollector.emit(dataElement);
}
/**
@@ -348,7 +335,7 @@ public final class TaskExecutor {
}
// Successfully fetched an element
- processElement(dataFetcher.getChild(), element);
+ processElement(dataFetcher.getOutputCollector(), element);
}
// Remove the finished fetcher from the list
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 1f40faa..b120241 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.task;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
@@ -121,7 +122,7 @@ public final class ParentTaskDataFetcherTest {
return new ParentTaskDataFetcher(
mock(IRVertex.class),
readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
- mock(VertexHarness.class));
+ mock(OutputCollector.class));
}
private InputReader generateInputReader(final CompletableFuture completableFuture) {