You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/28 06:38:19 UTC

[GitHub] johnyangk closed pull request #131: [Nemo-238] Optimize processElement in TaskExecutor

johnyangk closed pull request #131: [Nemo-238] Optimize processElement in TaskExecutor
URL: https://github.com/apache/incubator-nemo/pull/131
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
index 37f46ca8d..41fc6c602 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java
@@ -35,7 +35,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class PerKeyMedianITCase {
-  private static final int TIMEOUT = 60 * 1000;
+  private static final int TIMEOUT = 120 * 1000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
new file mode 100644
index 000000000..0a64042ed
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This collector receives data from DataFetcher and forwards it to the next operator.
+ * @param <O> output type.
+ */
+public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
+  private final OperatorVertex nextOperatorVertex;
+
+  /**
+   * It forwards output to the next operator.
+   */
+  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
+    this.nextOperatorVertex = nextOperatorVertex;
+  }
+
+  @Override
+  public void emit(final O output) {
+    nextOperatorVertex.getTransform().onData(output);
+  }
+
+  @Override
+  public <T> void emit(final String dstVertexId, final T output) {
+    throw new RuntimeException("No additional output tag in DataFetcherOutputCollector");
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
similarity index 84%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
rename to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 4a56ff0bd..715fde8eb 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -36,8 +36,8 @@
  *
  * @param <O> output type.
  */
-public final class OutputCollectorImpl<O> implements OutputCollector<O> {
-  private static final Logger LOG = LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
+public final class OperatorVertexOutputCollector<O> implements OutputCollector<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
 
   private final IRVertex irVertex;
   private final List<OperatorVertex> internalMainOutputs;
@@ -53,11 +53,11 @@
    * @param externalMainOutputs external main outputs
    * @param externalAdditionalOutputs external additional outputs
    */
-  public OutputCollectorImpl(final IRVertex irVertex,
-                             final List<OperatorVertex> internalMainOutputs,
-                             final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
-                             final List<OutputWriter> externalMainOutputs,
-                             final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
+  public OperatorVertexOutputCollector(final IRVertex irVertex,
+                                       final List<OperatorVertex> internalMainOutputs,
+                                       final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
+                                       final List<OutputWriter> externalMainOutputs,
+                                       final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
     this.irVertex = irVertex;
     this.internalMainOutputs = internalMainOutputs;
     this.internalAdditionalOutputs = internalAdditionalOutputs;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index e3b3e4364..5b4777c33 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
@@ -27,12 +28,12 @@
  */
 abstract class DataFetcher {
   private final IRVertex dataSource;
-  private final VertexHarness child;
+  private final OutputCollector outputCollector;
 
   DataFetcher(final IRVertex dataSource,
-              final VertexHarness child) {
+              final OutputCollector outputCollector) {
     this.dataSource = dataSource;
-    this.child = child;
+    this.outputCollector = outputCollector;
   }
 
   /**
@@ -43,11 +44,7 @@
    */
   abstract Object fetchDataElement() throws IOException;
 
-  VertexHarness getChild() {
-    return child;
-  }
-
-  public IRVertex getDataSource() {
-    return dataSource;
+  OutputCollector getOutputCollector() {
+    return outputCollector;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index d1b4ccb0b..70680ccb8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
@@ -49,8 +50,9 @@
   private long serBytes = 0;
   private long encodedBytes = 0;
 
-  ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask, final VertexHarness child) {
-    super(dataSource, child);
+  ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask,
+                        final OutputCollector outputCollector) {
+    super(dataSource, outputCollector);
     this.readersForParentTask = readerForParentTask;
     this.firstFetch = true;
     this.currentIteratorIndex = 0;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index dc4264265..505d24676 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 
@@ -37,8 +38,8 @@
 
   SourceVertexDataFetcher(final IRVertex dataSource,
                           final Readable readable,
-                          final VertexHarness child) {
-    super(dataSource, child);
+                          final OutputCollector outputCollector) {
+    super(dataSource, outputCollector);
     this.readable = readable;
   }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index e920014d8..fac3e768c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -49,7 +49,7 @@
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
+import org.apache.nemo.runtime.executor.datatransfer.OperatorVertexOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,7 +177,7 @@ public TaskExecutor(final Task task,
         outputCollector = new DynOptDataOutputCollector(
           irVertex, persistentConnectionToMasterMap, this);
       } else {
-        outputCollector = new OutputCollectorImpl(
+        outputCollector = new OperatorVertexOutputCollector(
           irVertex, internalMainOutputs, internalAdditionalOutputMap,
           externalMainOutputs, externalAdditionalOutputMap);
       }
@@ -194,7 +194,7 @@ public TaskExecutor(final Task task,
       // Source read
       if (irVertex instanceof SourceVertex) {
         // Source vertex read
-        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), vertexHarness));
+        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), outputCollector));
       }
 
       // Parent-task read (broadcasts)
@@ -224,7 +224,8 @@ public TaskExecutor(final Task task,
       final List<InputReader> nonBroadcastReaders =
         getParentTaskReaders(taskIndex, nonBroadcastInEdges, dataTransferFactory);
       nonBroadcastReaders.forEach(parentTaskReader -> nonBroadcastDataFetcherList.add(
-        new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader, vertexHarness)));
+        new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+          new DataFetcherOutputCollector((OperatorVertex) irVertex))));
     });
 
     final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -237,23 +238,9 @@ public TaskExecutor(final Task task,
 
   /**
    * Process a data element down the DAG dependency.
-   *
-   * @param vertexHarness VertexHarness of a vertex to execute.
-   * @param dataElement   input data element to process.
    */
-  private void processElement(final VertexHarness vertexHarness, final Object dataElement) {
-    final IRVertex irVertex = vertexHarness.getIRVertex();
-    final OutputCollector outputCollector = vertexHarness.getOutputCollector();
-
-    // TODO #XXX: optimize processElement (do not check instanceof for each data element)
-    if (irVertex instanceof SourceVertex) {
-      outputCollector.emit(dataElement);
-    } else if (irVertex instanceof OperatorVertex) {
-      final Transform transform = ((OperatorVertex) irVertex).getTransform();
-      transform.onData(dataElement);
-    } else {
-      throw new UnsupportedOperationException("This type of IRVertex is not supported");
-    }
+  private void processElement(final OutputCollector outputCollector, final Object dataElement) {
+    outputCollector.emit(dataElement);
   }
 
   /**
@@ -348,7 +335,7 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
         }
 
         // Successfully fetched an element
-        processElement(dataFetcher.getChild(), element);
+        processElement(dataFetcher.getOutputCollector(), element);
       }
 
       // Remove the finished fetcher from the list
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 1f40faaa4..b12024184 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
@@ -121,7 +122,7 @@ private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTas
     return new ParentTaskDataFetcher(
         mock(IRVertex.class),
         readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
-        mock(VertexHarness.class));
+        mock(OutputCollector.class));
   }
 
   private InputReader generateInputReader(final CompletableFuture completableFuture) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services