You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/28 06:38:21 UTC

[incubator-nemo] branch master updated: [Nemo-238] Optimize processElement in TaskExecutor (#131)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c03023  [Nemo-238] Optimize processElement in TaskExecutor (#131)
5c03023 is described below

commit 5c030232e7ba1b98c4ad0cfad36f3a8125da6f88
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Sun Oct 28 15:38:17 2018 +0900

    [Nemo-238] Optimize processElement in TaskExecutor (#131)
    
    JIRA: [NEMO-238: Optimize processElement in TaskExecutor](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-238)
    
    **Major changes:**
    - Refactor `processElement` by creating `DataFetcherOutputCollector`
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    -
    
    **Other comments:**
    -
    
    Closes #131
---
 .../nemo/examples/beam/PerKeyMedianITCase.java     |  2 +-
 .../datatransfer/DataFetcherOutputCollector.java   | 50 ++++++++++++++++++++++
 ...mpl.java => OperatorVertexOutputCollector.java} | 14 +++---
 .../nemo/runtime/executor/task/DataFetcher.java    | 15 +++----
 .../executor/task/ParentTaskDataFetcher.java       |  6 ++-
 .../executor/task/SourceVertexDataFetcher.java     |  5 ++-
 .../nemo/runtime/executor/task/TaskExecutor.java   | 29 ++++---------
 .../executor/task/ParentTaskDataFetcherTest.java   |  3 +-
 8 files changed, 81 insertions(+), 43 deletions(-)

diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
index 37f46ca..41fc6c6 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
@@ -35,7 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class PerKeyMedianITCase {
-  private static final int TIMEOUT = 60 * 1000;
+  private static final int TIMEOUT = 120 * 1000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
new file mode 100644
index 0000000..0a64042
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This collector receives data from DataFetcher and forwards it to the next operator.
+ * @param <O> output type.
+ */
+public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
+  private final OperatorVertex nextOperatorVertex;
+
+  /**
+   * It forwards output to the next operator.
+   */
+  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
+    this.nextOperatorVertex = nextOperatorVertex;
+  }
+
+  @Override
+  public void emit(final O output) {
+    nextOperatorVertex.getTransform().onData(output);
+  }
+
+  @Override
+  public <T> void emit(final String dstVertexId, final T output) {
+    throw new RuntimeException("No additional output tag in DataFetcherOutputCollector");
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
similarity index 84%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
rename to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 4a56ff0..715fde8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -36,8 +36,8 @@ import java.util.*;
  *
  * @param <O> output type.
  */
-public final class OutputCollectorImpl<O> implements OutputCollector<O> {
-  private static final Logger LOG = LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
+public final class OperatorVertexOutputCollector<O> implements OutputCollector<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
 
   private final IRVertex irVertex;
   private final List<OperatorVertex> internalMainOutputs;
@@ -53,11 +53,11 @@ public final class OutputCollectorImpl<O> implements OutputCollector<O> {
    * @param externalMainOutputs external main outputs
    * @param externalAdditionalOutputs external additional outputs
    */
-  public OutputCollectorImpl(final IRVertex irVertex,
-                             final List<OperatorVertex> internalMainOutputs,
-                             final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
-                             final List<OutputWriter> externalMainOutputs,
-                             final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
+  public OperatorVertexOutputCollector(final IRVertex irVertex,
+                                       final List<OperatorVertex> internalMainOutputs,
+                                       final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
+                                       final List<OutputWriter> externalMainOutputs,
+                                       final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
     this.irVertex = irVertex;
     this.internalMainOutputs = internalMainOutputs;
     this.internalAdditionalOutputs = internalAdditionalOutputs;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index e3b3e43..5b4777c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
@@ -27,12 +28,12 @@ import java.io.IOException;
  */
 abstract class DataFetcher {
   private final IRVertex dataSource;
-  private final VertexHarness child;
+  private final OutputCollector outputCollector;
 
   DataFetcher(final IRVertex dataSource,
-              final VertexHarness child) {
+              final OutputCollector outputCollector) {
     this.dataSource = dataSource;
-    this.child = child;
+    this.outputCollector = outputCollector;
   }
 
   /**
@@ -43,11 +44,7 @@ abstract class DataFetcher {
    */
   abstract Object fetchDataElement() throws IOException;
 
-  VertexHarness getChild() {
-    return child;
-  }
-
-  public IRVertex getDataSource() {
-    return dataSource;
+  OutputCollector getOutputCollector() {
+    return outputCollector;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index d1b4ccb..70680cc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
@@ -49,8 +50,9 @@ class ParentTaskDataFetcher extends DataFetcher {
   private long serBytes = 0;
   private long encodedBytes = 0;
 
-  ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask, final VertexHarness child) {
-    super(dataSource, child);
+  ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask,
+                        final OutputCollector outputCollector) {
+    super(dataSource, outputCollector);
     this.readersForParentTask = readerForParentTask;
     this.firstFetch = true;
     this.currentIteratorIndex = 0;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index dc42642..505d246 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 
@@ -37,8 +38,8 @@ class SourceVertexDataFetcher extends DataFetcher {
 
   SourceVertexDataFetcher(final IRVertex dataSource,
                           final Readable readable,
-                          final VertexHarness child) {
-    super(dataSource, child);
+                          final OutputCollector outputCollector) {
+    super(dataSource, outputCollector);
     this.readable = readable;
   }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index e920014..fac3e76 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
+import org.apache.nemo.runtime.executor.datatransfer.OperatorVertexOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,7 +177,7 @@ public final class TaskExecutor {
         outputCollector = new DynOptDataOutputCollector(
           irVertex, persistentConnectionToMasterMap, this);
       } else {
-        outputCollector = new OutputCollectorImpl(
+        outputCollector = new OperatorVertexOutputCollector(
           irVertex, internalMainOutputs, internalAdditionalOutputMap,
           externalMainOutputs, externalAdditionalOutputMap);
       }
@@ -194,7 +194,7 @@ public final class TaskExecutor {
       // Source read
       if (irVertex instanceof SourceVertex) {
         // Source vertex read
-        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), vertexHarness));
+        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), outputCollector));
       }
 
       // Parent-task read (broadcasts)
@@ -224,7 +224,8 @@ public final class TaskExecutor {
       final List<InputReader> nonBroadcastReaders =
         getParentTaskReaders(taskIndex, nonBroadcastInEdges, dataTransferFactory);
       nonBroadcastReaders.forEach(parentTaskReader -> nonBroadcastDataFetcherList.add(
-        new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader, vertexHarness)));
+        new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+          new DataFetcherOutputCollector((OperatorVertex) irVertex))));
     });
 
     final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -237,23 +238,9 @@ public final class TaskExecutor {
 
   /**
    * Process a data element down the DAG dependency.
-   *
-   * @param vertexHarness VertexHarness of a vertex to execute.
-   * @param dataElement   input data element to process.
    */
-  private void processElement(final VertexHarness vertexHarness, final Object dataElement) {
-    final IRVertex irVertex = vertexHarness.getIRVertex();
-    final OutputCollector outputCollector = vertexHarness.getOutputCollector();
-
-    // TODO #XXX: optimize processElement (do not check instanceof for each data element)
-    if (irVertex instanceof SourceVertex) {
-      outputCollector.emit(dataElement);
-    } else if (irVertex instanceof OperatorVertex) {
-      final Transform transform = ((OperatorVertex) irVertex).getTransform();
-      transform.onData(dataElement);
-    } else {
-      throw new UnsupportedOperationException("This type of IRVertex is not supported");
-    }
+  private void processElement(final OutputCollector outputCollector, final Object dataElement) {
+    outputCollector.emit(dataElement);
   }
 
   /**
@@ -348,7 +335,7 @@ public final class TaskExecutor {
         }
 
         // Successfully fetched an element
-        processElement(dataFetcher.getChild(), element);
+        processElement(dataFetcher.getOutputCollector(), element);
       }
 
       // Remove the finished fetcher from the list
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 1f40faa..b120241 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
@@ -121,7 +122,7 @@ public final class ParentTaskDataFetcherTest {
     return new ParentTaskDataFetcher(
         mock(IRVertex.class),
         readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
-        mock(VertexHarness.class));
+        mock(OutputCollector.class));
   }
 
   private InputReader generateInputReader(final CompletableFuture completableFuture) {