You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/25 23:57:21 UTC

[GitHub] johnyangk closed pull request #128: [NEMO-235] Refactor TaskExecutor's data processing logic

johnyangk closed pull request #128: [NEMO-235] Refactor TaskExecutor's data processing logic
URL: https://github.com/apache/incubator-nemo/pull/128
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index 87db84f86..b59344a7e 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -17,7 +17,6 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import java.io.Serializable;
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -54,7 +53,6 @@
      * @return the broadcast variable.
      */
     Object getBroadcastVariable(Serializable id);
-    Map<String, String> getTagToAdditionalChildren();
 
     /**
      * Put serialized data to send to the executor.
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 8679c7348..444dfb40a 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -119,8 +119,7 @@ public final void prepare(final Context context, final OutputCollector<WindowedV
     this.outputCollector = oc;
 
     // create output manager
-    outputManager = new DefaultOutputManager<>(
-      outputCollector, context, mainOutputTag);
+    outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
 
     // create side input reader
     if (!sideInputs.isEmpty()) {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
index 4174c6ca0..457a12812 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
@@ -19,9 +19,6 @@
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-
-import java.util.Map;
 
 /**
  * Default output emitter that uses outputCollector.
@@ -30,14 +27,11 @@
 public final class DefaultOutputManager<OutputT> implements DoFnRunners.OutputManager {
   private final TupleTag<OutputT> mainOutputTag;
   private final OutputCollector<WindowedValue<OutputT>> outputCollector;
-  private final Map<String, String> additionalOutputs;
 
   DefaultOutputManager(final OutputCollector<WindowedValue<OutputT>> outputCollector,
-                       final Transform.Context context,
                        final TupleTag<OutputT> mainOutputTag) {
     this.outputCollector = outputCollector;
     this.mainOutputTag = mainOutputTag;
-    this.additionalOutputs = context.getTagToAdditionalChildren();
   }
 
   @Override
@@ -45,7 +39,7 @@
     if (tag.equals(mainOutputTag)) {
       outputCollector.emit((WindowedValue<OutputT>) output);
     } else {
-      outputCollector.emit(additionalOutputs.get(tag.getId()), output);
+      outputCollector.emit(tag.getId(), output);
     }
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index e4743b5c6..eea586744 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -53,7 +53,7 @@
 @Requires(CommunicationPatternProperty.class)
 public final class SkewReshapingPass extends ReshapingPass {
   private static final Logger LOG = LoggerFactory.getLogger(SkewReshapingPass.class.getName());
-
+  private static final String ADDITIONAL_OUTPUT_TAG = "DynOptData";
   /**
    * Default constructor.
    */
@@ -154,7 +154,7 @@ private OperatorVertex generateMetricCollectVertex(final IREdge edge, final Oper
         (dynOptData, outputCollector)-> {
           dynOptData.forEach((k, v) -> {
             final Pair<Object, Object> pairData = Pair.of(k, v);
-            outputCollector.emit(abv.getId(), pairData);
+            outputCollector.emit(ADDITIONAL_OUTPUT_TAG, pairData);
           });
           return dynOptData;
         };
@@ -180,7 +180,7 @@ private IREdge generateEdgeToABV(final IREdge edge,
     newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
     newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
-    newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
+    newEdge.setProperty(AdditionalOutputTagProperty.of(ADDITIONAL_OUTPUT_TAG));
 
     // Dynamic optimization handles statistics on key-value data by default.
     // We need to get coders for encoding/decoding the keys to send data to
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index 9a65e7a56..db04f1e89 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -115,7 +115,6 @@ public void testMultiOutputOutput() {
 
     // mock context
     final Transform.Context context = mock(Transform.Context.class);
-    when(context.getTagToAdditionalChildren()).thenReturn(tagsMap);
 
     final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
     doFnTransform.prepare(context, oc);
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
index 682cf3ca7..bd222715c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
@@ -19,7 +19,6 @@
 import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
 
 import java.io.Serializable;
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -27,18 +26,14 @@
  */
 public final class TransformContextImpl implements Transform.Context {
   private final BroadcastManagerWorker broadcastManagerWorker;
-  private final Map<String, String> tagToAdditionalChildren;
   private String data;
 
   /**
    * Constructor of Context Implementation.
    * @param broadcastManagerWorker for broadcast variables.
-   * @param tagToAdditionalChildren tag id to additional vertices id map.
    */
-  public TransformContextImpl(final BroadcastManagerWorker broadcastManagerWorker,
-                              final Map<String, String> tagToAdditionalChildren) {
+  public TransformContextImpl(final BroadcastManagerWorker broadcastManagerWorker) {
     this.broadcastManagerWorker = broadcastManagerWorker;
-    this.tagToAdditionalChildren = tagToAdditionalChildren;
     this.data = null;
   }
 
@@ -47,11 +42,6 @@ public Object getBroadcastVariable(final Serializable tag) {
     return broadcastManagerWorker.get(tag);
   }
 
-  @Override
-  public Map<String, String> getTagToAdditionalChildren() {
-    return this.tagToAdditionalChildren;
-  }
-
   @Override
   public void setSerializedData(final String serializedData) {
     this.data = serializedData;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
new file mode 100644
index 000000000..c2a013a59
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import org.apache.nemo.runtime.executor.task.TaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * OutputCollector for dynamic optimization data.
+ *
+ * @param <O> output type.
+ */
+public final class DynOptDataOutputCollector<O> implements OutputCollector<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(DynOptDataOutputCollector.class.getName());
+  private static final String NULL_KEY = "NULL";
+
+  private final IRVertex irVertex;
+  private final PersistentConnectionToMasterMap connectionToMasterMap;
+  private final TaskExecutor taskExecutor;
+
+  public DynOptDataOutputCollector(final IRVertex irVertex,
+                                   final PersistentConnectionToMasterMap connectionToMasterMap,
+                                   final TaskExecutor taskExecutor) {
+    this.irVertex = irVertex;
+    this.connectionToMasterMap = connectionToMasterMap;
+    this.taskExecutor = taskExecutor;
+  }
+
+  @Override
+  public void emit(final O output) {
+    final Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) output;
+    final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new ArrayList<>();
+    aggregatedDynOptData.forEach((key, size) ->
+      partitionSizeEntries.add(
+        ControlMessage.PartitionSizeEntry.newBuilder()
+          .setKey(key == null ? NULL_KEY : String.valueOf(key))
+          .setSize(size)
+          .build())
+    );
+
+    connectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+      .send(ControlMessage.Message.newBuilder()
+        .setId(RuntimeIdManager.generateMessageId())
+        .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+        .setType(ControlMessage.MessageType.DataSizeMetric)
+        .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
+          .addAllPartitionSize(partitionSizeEntries)
+        )
+        .build());
+
+    // set the id of this vertex to mark the corresponding stage as put on hold
+    taskExecutor.setIRVertexPutOnHold(irVertex);
+  }
+
+  @Override
+  public <T> void emit(final String dstVertexId, final T output) {
+    throw new IllegalStateException("Dynamic optimization does not emit tagged data");
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index b489cbb4c..aa510e91b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -15,8 +15,9 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,110 +25,75 @@
 
 /**
  * OutputCollector implementation.
+ * This emits four types of outputs
+ * 1) internal main outputs: this output becomes the input of internal Transforms
+ * 2) internal additional outputs: this additional output becomes the input of internal Transforms
+ * 3) external main outputs: this external output is emitted to OutputWriter
+ * 4) external additional outputs: this external output is emitted to OutputWriter
  *
  * @param <O> output type.
  */
 public final class OutputCollectorImpl<O> implements OutputCollector<O> {
   private static final Logger LOG = LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
-  private final Set<String> mainTagOutputChildren;
-  // Use ArrayList (not Queue) to allow 'null' values
-  private final ArrayList<O> mainTagElements;
-  // Key: Pair of tag and destination vertex id
-  // Value: data elements which will be input to the tagged destination vertex
-  private final Map<Pair<String, String>, ArrayList<Object>> additionalTaggedChildToElementsMap;
+
+  private final IRVertex irVertex;
+  private final List<OperatorVertex> internalMainOutputs;
+  private final Map<String, List<OperatorVertex>> internalAdditionalOutputs;
+  private final List<OutputWriter> externalMainOutputs;
+  private final Map<String, List<OutputWriter>> externalAdditionalOutputs;
 
   /**
-   * Constructor of a new OutputCollectorImpl with tagged outputs.
-   * @param mainChildren   main children vertices
-   * @param tagToChildren additional children vertices
+   * Constructor of the output collector.
+   * @param irVertex the ir vertex that emits the output
+   * @param internalMainOutputs internal main outputs
+   * @param internalAdditionalOutputs internal additional outputs
+   * @param externalMainOutputs external main outputs
+   * @param externalAdditionalOutputs external additional outputs
    */
-  public OutputCollectorImpl(final Set<String> mainChildren,
-                             final Map<String, String> tagToChildren) {
-    this.mainTagOutputChildren = mainChildren;
-    this.mainTagElements = new ArrayList<>(1);
-    this.additionalTaggedChildToElementsMap = new HashMap<>();
-    tagToChildren.forEach((tag, child) ->
-      this.additionalTaggedChildToElementsMap.put(Pair.of(tag, child), new ArrayList<>(1)));
-  }
-
-  @Override
-  public void emit(final O output) {
-    mainTagElements.add(output);
+  public OutputCollectorImpl(final IRVertex irVertex,
+                             final List<OperatorVertex> internalMainOutputs,
+                             final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
+                             final List<OutputWriter> externalMainOutputs,
+                             final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
+    this.irVertex = irVertex;
+    this.internalMainOutputs = internalMainOutputs;
+    this.internalAdditionalOutputs = internalAdditionalOutputs;
+    this.externalMainOutputs = externalMainOutputs;
+    this.externalAdditionalOutputs = externalAdditionalOutputs;
   }
 
-  @Override
-  public <T> void emit(final String dstVertexId, final T output) {
-    if (this.mainTagOutputChildren.contains(dstVertexId)) {
-      // This dstVertexId is for the main tag
-      emit((O) output);
-    } else {
-      // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      final List<Object> dataElements = getAdditionalTaggedDataFromDstVertexId(dstVertexId);
-      dataElements.add(output);
-    }
+  private void emit(final OperatorVertex vertex, final O output) {
+    vertex.getTransform().onData(output);
   }
 
-  public Iterable<O> iterateMain() {
-    return mainTagElements;
+  private void emit(final OutputWriter writer, final O output) {
+    writer.write(output);
   }
 
-  public Iterable<Object> iterateTag(final String tag) {
-    if (this.mainTagOutputChildren.contains(tag)) {
-      // This dstVertexId is for the main tag
-      return (Iterable<Object>) iterateMain();
-    } else {
-      return getAdditionalTaggedDataFromTag(tag);
+  @Override
+  public void emit(final O output) {
+    for (final OperatorVertex internalVertex : internalMainOutputs) {
+      emit(internalVertex, output);
     }
-  }
-
-  public void clearMain() {
-    mainTagElements.clear();
-  }
 
-  public void clearTag(final String tag) {
-    if (this.mainTagOutputChildren.contains(tag)) {
-      // This dstVertexId is for the main tag
-      clearMain();
-    } else {
-      // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      final List<Object> dataElements = getAdditionalTaggedDataFromTag(tag);
-      dataElements.clear();
+    for (final OutputWriter externalWriter : externalMainOutputs) {
+      emit(externalWriter, output);
     }
   }
 
-  public List<O> getMainTagOutputQueue() {
-    return mainTagElements;
-  }
-
-  public List<Object> getAdditionalTagOutputQueue(final String dstVertexId) {
-    if (this.mainTagOutputChildren.contains(dstVertexId)) {
-      return (List<Object>) this.mainTagElements;
-    } else {
-      return getAdditionalTaggedDataFromDstVertexId(dstVertexId);
-    }
-  }
+  @Override
+  public <T> void emit(final String dstVertexId, final T output) {
 
-  private List<Object> getAdditionalTaggedDataFromDstVertexId(final String dstVertexId) {
-    final Pair<String, String> tagAndChild =
-      this.additionalTaggedChildToElementsMap.keySet().stream()
-        .filter(key -> key.right().equals(dstVertexId))
-        .findAny().orElseThrow(() -> new RuntimeException("Wrong destination vertex id passed!"));
-    final List<Object> dataElements = this.additionalTaggedChildToElementsMap.get(tagAndChild);
-    if (dataElements == null) {
-      throw new IllegalArgumentException("Wrong destination vertex id passed!");
+    if (internalAdditionalOutputs.containsKey(dstVertexId)) {
+      for (final OperatorVertex internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
+        emit(internalVertex, (O) output);
+      }
     }
-    return dataElements;
-  }
 
-  private List<Object> getAdditionalTaggedDataFromTag(final String tag) {
-    final Pair<String, String> tagAndChild =
-      this.additionalTaggedChildToElementsMap.keySet().stream()
-        .filter(key -> key.left().equals(tag))
-        .findAny().orElseThrow(() -> new RuntimeException("Wrong tag " + tag + " passed!"));
-    final List<Object> dataElements = this.additionalTaggedChildToElementsMap.get(tagAndChild);
-    if (dataElements == null) {
-      throw new IllegalArgumentException("Wrong tag " + tag + " passed!");
+    if (externalAdditionalOutputs.containsKey(dstVertexId)) {
+      for (final OutputWriter externalWriter : externalAdditionalOutputs.get(dstVertexId)) {
+        emit(externalWriter, (O) output);
+      }
     }
-    return dataElements;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 9035cbe08..4cf789ee4 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -18,6 +18,7 @@
 import com.google.common.collect.Lists;
 import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
@@ -44,6 +45,8 @@
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
+import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +60,6 @@
 public final class TaskExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
   private static final int NONE_FINISHED = -1;
-  private static final String NULL_KEY = "NULL";
 
   // Essential information
   private boolean isExecuted;
@@ -149,38 +151,39 @@ public TaskExecutor(final Task task,
     final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
     reverseTopologicallySorted.forEach(irVertex -> {
-      final List<VertexHarness> children = getChildrenHarnesses(irVertex, irVertexDag, vertexIdToHarness);
       final Optional<Readable> sourceReader = getSourceVertexReader(irVertex, task.getIrVertexIdToReadable());
       if (sourceReader.isPresent() != irVertex instanceof SourceVertex) {
         throw new IllegalStateException(irVertex.toString());
       }
 
-      // Prepare data WRITE
-      // Child-task writes
-      final Map<String, String> additionalOutputMap =
-          getAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), irVertexDag);
-
-      final List<Boolean> isToAdditionalTagOutputs = children.stream()
-          .map(harness -> harness.getIRVertex().getId())
-          .map(additionalOutputMap::containsValue)
-          .collect(Collectors.toList());
-
-      // Handle writes
-      // Main output children task writes
-      final List<OutputWriter> mainChildrenTaskWriters = getMainChildrenTaskWriters(
-        irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap);
-      final Map<String, OutputWriter> additionalChildrenTaskWriters = getAdditionalChildrenTaskWriters(
-        irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap);
-      // Intra-task writes
-      final List<String> additionalOutputVertices = new ArrayList<>(additionalOutputMap.values());
-      final Set<String> mainChildren =
-          getMainOutputVertices(irVertex, irVertexDag, task.getTaskOutgoingEdges(), additionalOutputVertices);
-      final OutputCollectorImpl oci = new OutputCollectorImpl(mainChildren, additionalOutputMap);
+      // Additional outputs
+      final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
+        getInternalAdditionalOutputMap(irVertex, irVertexDag);
+      final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
+        getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), dataTransferFactory);
+
+      // Main outputs
+      final List<OperatorVertex> internalMainOutputs = getInternalMainOutputs(irVertex, irVertexDag);
+      final List<OutputWriter> externalMainOutputs =
+        getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), dataTransferFactory);
+
+      final OutputCollector outputCollector;
+
+      if (irVertex instanceof OperatorVertex
+        && ((OperatorVertex) irVertex).getTransform() instanceof AggregateMetricTransform) {
+        outputCollector = new DynOptDataOutputCollector(
+          irVertex, persistentConnectionToMasterMap, this);
+      } else {
+        outputCollector = new OutputCollectorImpl(
+          irVertex, internalMainOutputs, internalAdditionalOutputMap,
+          externalMainOutputs, externalAdditionalOutputMap);
+      }
 
       // Create VERTEX HARNESS
       final VertexHarness vertexHarness = new VertexHarness(
-        irVertex, oci, children, isToAdditionalTagOutputs, mainChildrenTaskWriters, additionalChildrenTaskWriters,
-        new TransformContextImpl(broadcastManagerWorker, additionalOutputMap));
+        irVertex, outputCollector, new TransformContextImpl(broadcastManagerWorker),
+        externalMainOutputs, externalAdditionalOutputMap);
+
       prepareTransform(vertexHarness);
       vertexIdToHarness.put(irVertex.getId(), vertexHarness);
 
@@ -190,6 +193,7 @@ public TaskExecutor(final Task task,
         // Source vertex read
         nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), vertexHarness));
       }
+
       // Parent-task read (broadcasts)
       final List<StageEdge> inEdgesForThisVertex = task.getTaskIncomingEdges()
         .stream()
@@ -229,15 +233,16 @@ public TaskExecutor(final Task task,
   }
 
   /**
-   * Recursively process a data element down the DAG dependency.
+   * Process a data element down the DAG dependency.
    *
    * @param vertexHarness VertexHarness of a vertex to execute.
    * @param dataElement   input data element to process.
    */
-  private void processElementRecursively(final VertexHarness vertexHarness, final Object dataElement) {
+  private void processElement(final VertexHarness vertexHarness, final Object dataElement) {
     final IRVertex irVertex = vertexHarness.getIRVertex();
-    final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
+    final OutputCollector outputCollector = vertexHarness.getOutputCollector();
 
+    // TODO #XXX: optimize processElement (do not check instanceof for each data element)
     if (irVertex instanceof SourceVertex) {
       outputCollector.emit(dataElement);
     } else if (irVertex instanceof OperatorVertex) {
@@ -246,19 +251,6 @@ private void processElementRecursively(final VertexHarness vertexHarness, final
     } else {
       throw new UnsupportedOperationException("This type of IRVertex is not supported");
     }
-
-    // Given a single input element, a vertex can produce many output elements.
-    // Here, we recursively process all of the main output elements.
-    outputCollector.iterateMain().forEach(element ->
-      handleMainOutputElement(vertexHarness, element)); // Recursion
-    outputCollector.clearMain();
-
-    // Recursively process all of the additional output elements.
-    vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-      outputCollector.iterateTag(tag).forEach(element ->
-        handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
-      outputCollector.clearTag(tag);
-    });
   }
 
   /**
@@ -315,85 +307,9 @@ private void doExecute() {
     }
   }
 
-  /**
-   * Send aggregated statistics for dynamic optimization to master.
-   * @param dynOptData the statistics to send.
-   */
-  public void sendDynOptData(final Object dynOptData) {
-    Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) dynOptData;
-    final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new ArrayList<>();
-    aggregatedDynOptData.forEach((key, size) ->
-      partitionSizeEntries.add(
-        ControlMessage.PartitionSizeEntry.newBuilder()
-          .setKey(key == null ? NULL_KEY : String.valueOf(key))
-          .setSize(size)
-          .build())
-    );
-
-    persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-      .send(ControlMessage.Message.newBuilder()
-        .setId(RuntimeIdManager.generateMessageId())
-        .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-        .setType(ControlMessage.MessageType.DataSizeMetric)
-        .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
-          .addAllPartitionSize(partitionSizeEntries)
-        )
-        .build());
-  }
-
   private void finalizeVertex(final VertexHarness vertexHarness) {
     closeTransform(vertexHarness);
-
-    final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
-    final IRVertex v = vertexHarness.getIRVertex();
-    if (v instanceof OperatorVertex
-      && ((OperatorVertex) v).getTransform() instanceof AggregateMetricTransform) {
-      // send aggregated dynamic optimization data to master
-      final Object aggregatedDynOptData = outputCollector.iterateMain().iterator().next();
-      sendDynOptData(aggregatedDynOptData);
-      // set the id of this vertex to mark the corresponding stage as put on hold
-      setIRVertexPutOnHold(v);
-    } else {
-      // handle main outputs
-      outputCollector.iterateMain().forEach(element -> {
-        handleMainOutputElement(vertexHarness, element);
-      }); // Recursion
-      outputCollector.clearMain();
-
-      // handle intra-task additional tagged outputs
-      vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-        outputCollector.iterateTag(tag).forEach(
-          element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
-        outputCollector.clearTag(tag);
-      });
-
-      // handle inter-task additional tagged outputs
-      vertexHarness.getTagToAdditionalChildrenId().keySet().forEach(tag -> {
-        outputCollector.iterateTag(tag).forEach(
-          element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
-        outputCollector.clearTag(tag);
-      });
-
-      finalizeOutputWriters(vertexHarness);
-    }
-  }
-
-  private void handleMainOutputElement(final VertexHarness harness, final Object element) {
-    // writes to children tasks
-    harness.getWritersToMainChildrenTasks().forEach(outputWriter -> outputWriter.write(element));
-    // process elements in the next vertices within a task
-    harness.getMainTagChildren().forEach(child -> processElementRecursively(child, element));
-  }
-
-  private void handleAdditionalOutputElement(final VertexHarness harness, final Object element, final String tag) {
-    // writes to additional children tasks
-    harness.getWritersToAdditionalChildrenTasks().entrySet().stream()
-      .filter(kv -> kv.getKey().equals(tag))
-      .forEach(kv -> kv.getValue().write(element));
-    // process elements in the next vertices within a task
-    harness.getAdditionalTagOutputChildren().entrySet().stream()
-      .filter(kv -> kv.getKey().equals(tag))
-      .forEach(kv -> processElementRecursively(kv.getValue(), element));
+    finalizeOutputWriters(vertexHarness);
   }
 
   /**
@@ -429,7 +345,7 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
         }
 
         // Successfully fetched an element
-        processElementRecursively(dataFetcher.getChild(), element);
+        processElement(dataFetcher.getChild(), element);
       }
 
       // Remove the finished fetcher from the list
@@ -441,76 +357,54 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
   }
 
   ////////////////////////////////////////////// Helper methods for setting up initial data structures
-
-  private Map<String, String> getAdditionalOutputMap(final IRVertex irVertex,
-                                                     final List<StageEdge> outEdgesToChildrenTasks,
-                                                     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
-    final Map<String, String> additionalOutputMap = new HashMap<>();
-
-    // Add all intra-task additional tags to additional output map.
-    irVertexDag.getOutgoingEdgesOf(irVertex.getId())
-      .stream()
-      .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge ->
-        Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), edge.getDst().getId()))
-      .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right()));
-
+  private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap(
+    final IRVertex irVertex,
+    final List<StageEdge> outEdgesToChildrenTasks,
+    final DataTransferFactory dataTransferFactory) {
     // Add all inter-task additional tags to additional output map.
+    final Map<String, List<OutputWriter>> map = new HashMap<>();
+
     outEdgesToChildrenTasks
       .stream()
       .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
       .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
       .map(edge ->
-        Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), edge.getDstIRVertex().getId()))
-      .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right()));
+        Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
+          dataTransferFactory.createWriter(taskId, edge.getDstIRVertex(), edge)))
+      .forEach(pair -> {
+        map.putIfAbsent(pair.left(), new ArrayList<>());
+        map.get(pair.left()).add(pair.right());
+      });
 
-    return additionalOutputMap;
+    return map;
   }
 
-  private Optional<Readable> getSourceVertexReader(final IRVertex irVertex,
-                                                   final Map<String, Readable> irVertexIdToReadable) {
-    if (irVertex instanceof SourceVertex) {
-      final Readable readable = irVertexIdToReadable.get(irVertex.getId());
-      if (readable == null) {
-        throw new IllegalStateException(irVertex.toString());
-      }
-      return Optional.of(readable);
-    } else {
-      return Optional.empty();
-    }
-  }
+  private Map<String, List<OperatorVertex>> getInternalAdditionalOutputMap(
+    final IRVertex irVertex,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+    // Add all intra-task additional tags to additional output map.
+    final Map<String, List<OperatorVertex>> map = new HashMap<>();
 
-  private List<InputReader> getParentTaskReaders(final int taskIndex,
-                                                 final List<StageEdge> inEdgesFromParentTasks,
-                                                 final DataTransferFactory dataTransferFactory) {
-    return inEdgesFromParentTasks
+    irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
-      .map(inEdgeForThisVertex -> dataTransferFactory
-        .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex))
-      .collect(Collectors.toList());
+      .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
+      .map(edge ->
+        Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), (OperatorVertex) edge.getDst()))
+      .forEach(pair -> {
+        map.putIfAbsent(pair.left(), new ArrayList<>());
+        map.get(pair.left()).add(pair.right());
+      });
+
+    return map;
   }
 
-  private Set<String> getMainOutputVertices(final IRVertex irVertex,
-                                            final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-                                            final List<StageEdge> outEdgesToChildrenTasks,
-                                            final List<String> additionalOutputVertices) {
-    // all intra-task children vertices id
-    final List<String> outputVertices = irVertexDag.getOutgoingEdgesOf(irVertex).stream()
-      .filter(edge -> edge.getSrc().getId().equals(irVertex.getId()))
-      .map(edge -> edge.getDst().getId())
+  private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
+                                                     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+    return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
+      .stream()
+      .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
+      .map(edge -> (OperatorVertex) edge.getDst())
       .collect(Collectors.toList());
-
-    // all inter-task children vertices id
-    outputVertices
-      .addAll(outEdgesToChildrenTasks.stream()
-        .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
-        .map(edge -> edge.getDstIRVertex().getId())
-        .collect(Collectors.toList()));
-
-    // return vertices that are not marked as additional tagged outputs
-    return new HashSet<>(outputVertices.stream()
-      .filter(vertexId -> !additionalOutputVertices.contains(vertexId))
-      .collect(Collectors.toList()));
   }
 
   /**
@@ -519,66 +413,42 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
    * @param irVertex                source irVertex
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
-   * @param taggedOutputs           tag to vertex id map
    * @return OutputWriters for main children tasks
    */
-  private List<OutputWriter> getMainChildrenTaskWriters(final IRVertex irVertex,
-                                                        final List<StageEdge> outEdgesToChildrenTasks,
-                                                        final DataTransferFactory dataTransferFactory,
-                                                        final Map<String, String> taggedOutputs) {
+  private List<OutputWriter> getExternalMainOutputs(final IRVertex irVertex,
+                                                   final List<StageEdge> outEdgesToChildrenTasks,
+                                                   final DataTransferFactory dataTransferFactory) {
     return outEdgesToChildrenTasks
       .stream()
-      .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
-      .filter(outEdge -> !taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
+      .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
+      .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
       .map(outEdgeForThisVertex -> dataTransferFactory
         .createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
       .collect(Collectors.toList());
   }
 
-  /**
-   * Return inter-task OutputWriters associated with additional output tags.
-   *
-   * @param irVertex                source irVertex
-   * @param outEdgesToChildrenTasks outgoing edges to child tasks
-   * @param dataTransferFactory     dataTransferFactory
-   * @param taggedOutputs           tag to vertex id map
-   * @return additional tag to OutputWriters map.
-   */
-  private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final IRVertex irVertex,
-                                                                     final List<StageEdge> outEdgesToChildrenTasks,
-                                                                     final DataTransferFactory dataTransferFactory,
-                                                                     final Map<String, String> taggedOutputs) {
-    final Map<String, OutputWriter> additionalChildrenTaskWriters = new HashMap<>();
 
-    outEdgesToChildrenTasks
-        .stream()
-        .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
-        .filter(outEdge -> taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
-        .forEach(outEdgeForThisVertex -> {
-          final String tag = taggedOutputs.entrySet().stream()
-            .filter(e -> e.getValue().equals(outEdgeForThisVertex.getDstIRVertex().getId()))
-            .findAny().orElseThrow(() -> new RuntimeException("Unexpected error while finding tag"))
-            .getKey();
-          additionalChildrenTaskWriters.put(tag,
-              dataTransferFactory.createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex));
-        });
-
-    return additionalChildrenTaskWriters;
+  private Optional<Readable> getSourceVertexReader(final IRVertex irVertex,
+                                                   final Map<String, Readable> irVertexIdToReadable) {
+    if (irVertex instanceof SourceVertex) {
+      final Readable readable = irVertexIdToReadable.get(irVertex.getId());
+      if (readable == null) {
+        throw new IllegalStateException(irVertex.toString());
+      }
+      return Optional.of(readable);
+    } else {
+      return Optional.empty();
+    }
   }
 
-  private List<VertexHarness> getChildrenHarnesses(final IRVertex irVertex,
-                                                   final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-                                                   final Map<String, VertexHarness> vertexIdToHarness) {
-    final List<VertexHarness> childrenHandlers = irVertexDag.getChildren(irVertex.getId())
+  private List<InputReader> getParentTaskReaders(final int taskIndex,
+                                                 final List<StageEdge> inEdgesFromParentTasks,
+                                                 final DataTransferFactory dataTransferFactory) {
+    return inEdgesFromParentTasks
       .stream()
-      .map(IRVertex::getId)
-      .map(vertexIdToHarness::get)
+      .map(inEdgeForThisVertex -> dataTransferFactory
+        .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex))
       .collect(Collectors.toList());
-    if (childrenHandlers.stream().anyMatch(harness -> harness == null)) {
-      // Sanity check: there shouldn't be a null harness.
-      throw new IllegalStateException(childrenHandlers.toString());
-    }
-    return childrenHandlers;
   }
 
   ////////////////////////////////////////////// Transform-specific helper methods
@@ -612,7 +482,7 @@ private void closeTransform(final VertexHarness vertexHarness) {
 
   ////////////////////////////////////////////// Misc
 
-  private void setIRVertexPutOnHold(final IRVertex irVertex) {
+  public void setIRVertexPutOnHold(final IRVertex irVertex) {
     idOfVertexPutOnHold = irVertex.getId();
   }
 
@@ -634,17 +504,20 @@ private void finalizeOutputWriters(final VertexHarness vertexHarness) {
     });
 
     // finalize OutputWriters for additional tagged children
-    vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter -> {
-      outputWriter.close();
-
-      final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
-      writtenBytes.ifPresent(writtenBytesList::add);
+    vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriters -> {
+      outputWriters.forEach(outputWriter -> {
+        outputWriter.close();
+        final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
+        writtenBytes.ifPresent(writtenBytesList::add);
+      });
     });
 
     long totalWrittenBytes = 0;
     for (final Long writtenBytes : writtenBytesList) {
       totalWrittenBytes += writtenBytes;
     }
+
+    // TODO #236: Decouple metric collection and sending logic
     metricMessageSender.send("TaskMetric", taskId,
       "writtenBytes", SerializationUtils.serialize(totalWrittenBytes));
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
index 39cb88dc6..a6a73d4d9 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
@@ -15,17 +15,14 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
 import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * Captures the relationship between a non-source IRVertex's outputCollector, and mainTagChildren vertices.
@@ -35,48 +32,20 @@
 
   // IRVertex and transform-specific information
   private final IRVertex irVertex;
-  private final OutputCollectorImpl outputCollector;
+  private final OutputCollector outputCollector;
   private final Transform.Context context;
-  private final List<VertexHarness> mainTagChildren;
-
-  // These lists can be empty
-  private final Map<String, VertexHarness> additionalTagOutputChildren;
-  private final Map<String, String> tagToAdditionalChildrenId;
-  private final List<OutputWriter> writersToMainChildrenTasks;
-  private final Map<String, OutputWriter> writersToAdditionalChildrenTasks;
+  private final List<OutputWriter> externalOutputWriter;
+  private final Map<String, List<OutputWriter>> externalAdditionalOutputWriter;
 
   VertexHarness(final IRVertex irVertex,
-                final OutputCollectorImpl outputCollector,
-                final List<VertexHarness> children,
-                final List<Boolean> isAdditionalTagOutputs,
-                final List<OutputWriter> writersToMainChildrenTasks,
-                final Map<String, OutputWriter> writersToAdditionalChildrenTasks,
-                final Transform.Context context) {
+                final OutputCollector outputCollector,
+                final Transform.Context context,
+                final List<OutputWriter> externalOutputWriter,
+                final Map<String, List<OutputWriter>> externalAdditionalOutputWriter) {
     this.irVertex = irVertex;
     this.outputCollector = outputCollector;
-    if (children.size() != isAdditionalTagOutputs.size()) {
-      throw new IllegalStateException(irVertex.toString());
-    }
-    final Map<String, String> taggedOutputMap = context.getTagToAdditionalChildren();
-    final Map<String, VertexHarness> tagged = new HashMap<>();
-
-    // Classify input type for intra-task children
-    for (int i = 0; i < children.size(); i++) {
-      final VertexHarness child = children.get(i);
-      if (isAdditionalTagOutputs.get(i)) {
-        taggedOutputMap.entrySet().stream()
-          .filter(kv -> child.getIRVertex().getId().equals(kv.getValue()))
-          .forEach(kv -> tagged.put(kv.getKey(), child));
-      }
-    }
-
-    this.tagToAdditionalChildrenId = context.getTagToAdditionalChildren();
-    this.additionalTagOutputChildren = tagged;
-    final List<VertexHarness> mainTagChildrenTmp = new ArrayList<>(children);
-    mainTagChildrenTmp.removeAll(additionalTagOutputChildren.values());
-    this.mainTagChildren = mainTagChildrenTmp;
-    this.writersToMainChildrenTasks = writersToMainChildrenTasks;
-    this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks;
+    this.externalOutputWriter = externalOutputWriter;
+    this.externalAdditionalOutputWriter = externalAdditionalOutputWriter;
     this.context = context;
   }
 
@@ -88,45 +57,31 @@ IRVertex getIRVertex() {
   }
 
   /**
-   * @return OutputCollector of this irVertex.
+   * @return id of irVertex.
    */
-  OutputCollectorImpl getOutputCollector() {
-    return outputCollector;
+  String getId() {
+    return irVertex.getId();
   }
 
   /**
-   * @return mainTagChildren harnesses.
-   */
-  List<VertexHarness> getMainTagChildren() {
-    return mainTagChildren;
-  }
-
-  /**
-   * @return map of tagged output mainTagChildren. (empty if none exists)
-   */
-  public Map<String, VertexHarness> getAdditionalTagOutputChildren() {
-    return additionalTagOutputChildren;
-  }
-
-  /**
-   * @return map of tag to additional children id.
+   * @return OutputCollector of this irVertex.
    */
-  public Map<String, String> getTagToAdditionalChildrenId() {
-    return tagToAdditionalChildrenId;
+  OutputCollector getOutputCollector() {
+    return outputCollector;
   }
 
   /**
    * @return OutputWriters for main outputs of this irVertex. (empty if none exists)
    */
   List<OutputWriter> getWritersToMainChildrenTasks() {
-    return writersToMainChildrenTasks;
+    return externalOutputWriter;
   }
 
   /**
    * @return OutputWriters for additional tagged outputs of this irVertex. (empty if none exists)
    */
-  Map<String, OutputWriter> getWritersToAdditionalChildrenTasks() {
-    return writersToAdditionalChildrenTasks;
+  Map<String, List<OutputWriter>> getWritersToAdditionalChildrenTasks() {
+    return externalAdditionalOutputWriter;
   }
 
   /**
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
index 1261569ad..8ccb63395 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
@@ -24,8 +24,6 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.*;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -39,19 +37,17 @@
 @PrepareForTest({BroadcastManagerWorker.class})
 public class TransformContextImplTest {
   private Transform.Context context;
-  private final Map<String, String> taggedOutputs = new HashMap<>();
 
   @Before
   public void setUp() {
     final BroadcastManagerWorker broadcastManagerWorker = mock(BroadcastManagerWorker.class);
     when(broadcastManagerWorker.get("a")).thenReturn("b");
-    this.context = new TransformContextImpl(broadcastManagerWorker, taggedOutputs);
+    this.context = new TransformContextImpl(broadcastManagerWorker);
   }
 
   @Test
   public void testContextImpl() {
     assertEquals("b", this.context.getBroadcastVariable("a"));
-    assertEquals(this.taggedOutputs, this.context.getTagToAdditionalChildren());
 
     final String sampleText = "test_text";
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 8e62947a8..9cd2b464d 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -284,9 +284,13 @@ public void testTwoOperatorsWithBroadcastVariable() {
    * emit(element) and emit(dstVertexId, element) used together. emit(element) routes results to main output children,
    * and emit(dstVertexId, element) routes results to corresponding additional output children.
    */
-  @Test(timeout = 5000)
+  @Test(timeout=5000)
   public void testAdditionalOutputs() throws Exception {
-    final IRVertex routerVertex = new OperatorVertex(new RoutingTransform());
+    final String additionalTag1 = "bonus1";
+    final String additionalTag2 = "bonus2";
+
+    final IRVertex routerVertex = new OperatorVertex(
+      new RoutingTransform(Arrays.asList(additionalTag1, additionalTag2)));
     final IRVertex mainVertex= new OperatorVertex(new RelayTransform());
     final IRVertex bonusVertex1 = new OperatorVertex(new RelayTransform());
     final IRVertex bonusVertex2 = new OperatorVertex(new RelayTransform());
@@ -295,8 +299,8 @@ public void testAdditionalOutputs() throws Exception {
     final RuntimeEdge<IRVertex> edge2 = createEdge(routerVertex, bonusVertex1, "edge-2");
     final RuntimeEdge<IRVertex> edge3 = createEdge(routerVertex, bonusVertex2, "edge-3");
 
-    edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus1"));
-    edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus2"));
+    edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag1));
+    edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag2));
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
         .addVertex(routerVertex)
@@ -514,12 +518,15 @@ public void close() {
    */
   private class RoutingTransform implements Transform<Integer, Integer> {
     private OutputCollector<Integer> outputCollector;
-    private Map<String, String> tagToVertex;
+    private final Collection<String> additionalTags;
+
+    public RoutingTransform(final Collection<String> additionalTags) {
+      this.additionalTags = additionalTags;
+    }
 
     @Override
     public void prepare(final Context context, OutputCollector<Integer> outputCollector) {
       this.outputCollector = outputCollector;
-      this.tagToVertex = context.getTagToAdditionalChildren();
     }
 
     @Override
@@ -530,7 +537,7 @@ public void onData(final Integer element) {
         outputCollector.emit(i);
       } else {
         // route to all additional outputs. Invoked if user calls c.output(tupleTag, element)
-        tagToVertex.values().forEach(vertex -> outputCollector.emit(vertex, i));
+        additionalTags.forEach(tag -> outputCollector.emit(tag, i));
       }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services