You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@nemo.apache.org by GitBox <gi...@apache.org> on 2020/11/24 09:15:56 UTC

[GitHub] [incubator-nemo] johnyangk commented on a change in pull request #293: [NEMO-436] (WIP) Dynamic re-configuration based on the Sampled Metric Data

johnyangk commented on a change in pull request #293:
URL: https://github.com/apache/incubator-nemo/pull/293#discussion_r529312252



##########
File path: common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java
##########
@@ -439,13 +441,23 @@ void addEncodingCompressionCheckers() {
           .collect(Collectors.toList());
 
         if (!nonStreamVertexEdge.isEmpty()) {
-          if (1 != nonStreamVertexEdge.stream()
-            .map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass()).distinct().count()) {
-            return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+          Set<? extends Class<? extends EncoderFactory>> encoderProperties = nonStreamVertexEdge.stream().map(e
+            -> e.getPropertyValue(EncoderProperty.class).get().getClass()).collect(Collectors.toSet());
+          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass())
+            .distinct().count()) {
+            if (!encoderProperties.contains(EncoderFactory.DummyEncoderFactory.class)
+              || encoderProperties.size() != 2) {

Review comment:
       Can you leave a short comment on this magic number 2?

##########
File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends RunTimePass<Map<String, Long>> {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+  private final String mapKey = "opt.parallelism";
+
+  /**
+   * Default Constructor.
+   */
+  public DynamicTaskSizingRuntimePass() {
+  }
+
+  @Override
+  public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>> mapMessage) {
+    final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+    final Set<IRVertex> stageVertices = edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+    irdag.topologicalDo(v -> {
+      if (stageVertices.contains(v)) {
+        edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+      }
+    });
+    LOG.info("Examined edges {}", edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+    final IREdge representativeEdge = edgesToOptimize.iterator().next();
+    // double check
+    if (!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).orElse(false)) {
+      return irdag;
+    }
+    final Map<String, Long> messageValue = mapMessage.getMessageValue();
+    LOG.info("messageValue {}", messageValue);
+    final int optimizedTaskSizeRatio = messageValue.get(mapKey).intValue();
+    final int partitionerProperty = getPartitionerProperty(irdag);
+    for (IREdge edge : edgesToOptimize) {
+      if (!edge.getPropertyValue(PartitionerProperty.class).get().right().equals(partitionerProperty)
+      && edge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.SHUFFLE)) {
+        throw new IllegalArgumentException();
+      }
+    }
+    final int partitionUnit = partitionerProperty / optimizedTaskSizeRatio;
+    edgesToOptimize.forEach(irEdge -> setSubPartitionProperty(irEdge, partitionUnit, partitionerProperty));
+    edgesToOptimize.forEach(irEdge -> setDstVertexParallelismProperty(irEdge, partitionUnit, partitionerProperty));
+    return irdag;
+  }
+
+  private int getPartitionerProperty(final IRDAG dag) {
+    long jobSizeInBytes = dag.getInputSize();
+    long jobSizeInGB = jobSizeInBytes / (1024 * 1024 * 1024);
+    if (1 <= jobSizeInGB && jobSizeInGB < 10) {
+      return 1024;
+    } else if (10 <= jobSizeInGB && jobSizeInGB < 100) {
+      return 2048;
+    } else {
+      return 4096;
+    }
+  }
+
+  private void setSubPartitionProperty(final IREdge edge, final int growingFactor, final int partitionerProperty) {
+    final List<KeyRange> keyRanges = edge.getPropertyValue(SubPartitionSetProperty.class).orElse(new ArrayList<>());
+    if (keyRanges.isEmpty()) {
+      return;
+    }
+    final int start = (int) keyRanges.get(0).rangeBeginInclusive();

Review comment:
       Can you leave a short comment on this magic number 0?

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.
+ */
+public final class ParallelismProphet implements Prophet<String, Long> {
+  private static final Logger LOG = LoggerFactory.getLogger(ParallelismProphet.class.getName());
+  private final SimulationScheduler simulationScheduler;
+  private final PhysicalPlanGenerator physicalPlanGenerator;
+  private final IRDAG currentIRDAG;
+  private final PhysicalPlan currentPhysicalPlan;
+  private final Set<StageEdge> edgesToOptimize;
+  private final int partitionerProperty;
+
+  /**
+   * Default constructor for ParallelismProphet.
+   * @param irdag                   current IRDAG
+   * @param physicalPlan            current PhysicalPlan
+   * @param simulationScheduler     SimulationScheduler to launch
+   * @param physicalPlanGenerator   PhysicalPlanGenerator to make physical plan which will be launched by
+   *                                simulation scheduler
+   * @param edgesToOptimize         edges to optimize at runtime pass
+   */
+  public ParallelismProphet(final IRDAG irdag, final PhysicalPlan physicalPlan,
+                            final SimulationScheduler simulationScheduler,
+                            final PhysicalPlanGenerator physicalPlanGenerator,
+                            final Set<StageEdge> edgesToOptimize) {
+    this.currentIRDAG = irdag;
+    this.currentPhysicalPlan = physicalPlan;
+    this.simulationScheduler = simulationScheduler;
+    this.physicalPlanGenerator = physicalPlanGenerator;
+    this.edgesToOptimize = edgesToOptimize;
+    this.partitionerProperty = calculatePartitionerProperty(edgesToOptimize);
+  }
+
+  /**
+   * Launch SimulationScheduler and find out the optimal parallelism.
+   * @return  Map of one element, with key "opt.parallelism".
+   */
+  @Override
+  public Map<String, Long> calculate() {
+    final Map<String, Long> result = new HashMap<>();
+    final List<PhysicalPlan> listOfPhysicalPlans = new ArrayList<>(); // when to update here?
+    for (int i = 0; i < 7; i++) {
+      final int parallelism = (int) (partitionerProperty / Math.pow(2, i));
+      PhysicalPlan newPlan = makePhysicalPlanForSimulation(parallelism, edgesToOptimize, currentIRDAG);
+      listOfPhysicalPlans.add(newPlan);
+    }
+    final List<Pair<Integer, Long>> listOfParallelismToDurationPair = listOfPhysicalPlans.stream()
+      .map(this::launchSimulationForPlan)
+      .filter(pair -> pair.right() > 0.5)
+      .collect(Collectors.toList());
+    final Pair<Integer, Long> pairWithMinDuration =
+      Collections.min(listOfParallelismToDurationPair, Comparator.comparing(p -> p.right()));
+    result.put("opt.parallelism", pairWithMinDuration.left().longValue());
+    return result;
+  }
+
+  /**
+   * Simulate the given physical plan.
+   * @param physicalPlan      physical plan(with only one stage) to simulate
+   * @return                  Pair of Integer and Long. Integer value indicates the simulated parallelism, and
+   *                          Long value is simulated job(=stage) duration.
+   */
+  private synchronized Pair<Integer, Long> launchSimulationForPlan(final PhysicalPlan physicalPlan) {
+    this.simulationScheduler.schedulePlan(physicalPlan, 1);

Review comment:
       Can you leave a short comment on this magic number 1?

##########
File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends RunTimePass<Map<String, Long>> {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+  private final String mapKey = "opt.parallelism";
+
+  /**
+   * Default Constructor.
+   */
+  public DynamicTaskSizingRuntimePass() {
+  }
+
+  @Override
+  public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>> mapMessage) {
+    final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+    final Set<IRVertex> stageVertices = edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+    irdag.topologicalDo(v -> {
+      if (stageVertices.contains(v)) {
+        edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+      }
+    });
+    LOG.info("Examined edges {}", edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+    final IREdge representativeEdge = edgesToOptimize.iterator().next();
+    // double check
+    if (!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).orElse(false)) {
+      return irdag;
+    }
+    final Map<String, Long> messageValue = mapMessage.getMessageValue();
+    LOG.info("messageValue {}", messageValue);
+    final int optimizedTaskSizeRatio = messageValue.get(mapKey).intValue();
+    final int partitionerProperty = getPartitionerProperty(irdag);
+    for (IREdge edge : edgesToOptimize) {
+      if (!edge.getPropertyValue(PartitionerProperty.class).get().right().equals(partitionerProperty)
+      && edge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.SHUFFLE)) {
+        throw new IllegalArgumentException();
+      }
+    }
+    final int partitionUnit = partitionerProperty / optimizedTaskSizeRatio;
+    edgesToOptimize.forEach(irEdge -> setSubPartitionProperty(irEdge, partitionUnit, partitionerProperty));
+    edgesToOptimize.forEach(irEdge -> setDstVertexParallelismProperty(irEdge, partitionUnit, partitionerProperty));
+    return irdag;
+  }
+
+  private int getPartitionerProperty(final IRDAG dag) {
+    long jobSizeInBytes = dag.getInputSize();
+    long jobSizeInGB = jobSizeInBytes / (1024 * 1024 * 1024);
+    if (1 <= jobSizeInGB && jobSizeInGB < 10) {
+      return 1024;
+    } else if (10 <= jobSizeInGB && jobSizeInGB < 100) {

Review comment:
       Names for these numbers would be useful.
   
   e.g., 
   static int 2GB = 2048;
   static int SMALL_BUCKET(?) = 10;

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/Prophet.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import java.util.Map;
+
+/**
+ * A prophet class for dynamic optimization.

Review comment:
       Any update on this? 

##########
File path: common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java
##########
@@ -439,13 +441,23 @@ void addEncodingCompressionCheckers() {
           .collect(Collectors.toList());
 
         if (!nonStreamVertexEdge.isEmpty()) {
-          if (1 != nonStreamVertexEdge.stream()
-            .map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass()).distinct().count()) {
-            return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+          Set<? extends Class<? extends EncoderFactory>> encoderProperties = nonStreamVertexEdge.stream().map(e
+            -> e.getPropertyValue(EncoderProperty.class).get().getClass()).collect(Collectors.toSet());
+          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass())
+            .distinct().count()) {
+            if (!encoderProperties.contains(EncoderFactory.DummyEncoderFactory.class)
+              || encoderProperties.size() != 2) {
+              return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+            }
           }
-          if (1 != nonStreamVertexEdge.stream()
-            .map(e -> e.getPropertyValue(DecoderProperty.class).get().getClass()).distinct().count()) {
-            return failure("Incompatible decoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+          Set<? extends Class<? extends DecoderFactory>> decoderProperties = nonStreamVertexEdge.stream().map(e
+            -> e.getPropertyValue(DecoderProperty.class).get().getClass()).collect(Collectors.toSet());
+          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(DecoderProperty.class).get().getClass())

Review comment:
       Can you leave a short comment on this magic number 1?

##########
File path: common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java
##########
@@ -439,13 +441,23 @@ void addEncodingCompressionCheckers() {
           .collect(Collectors.toList());
 
         if (!nonStreamVertexEdge.isEmpty()) {
-          if (1 != nonStreamVertexEdge.stream()
-            .map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass()).distinct().count()) {
-            return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+          Set<? extends Class<? extends EncoderFactory>> encoderProperties = nonStreamVertexEdge.stream().map(e
+            -> e.getPropertyValue(EncoderProperty.class).get().getClass()).collect(Collectors.toSet());
+          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(EncoderProperty.class).get().getClass())
+            .distinct().count()) {
+            if (!encoderProperties.contains(EncoderFactory.DummyEncoderFactory.class)
+              || encoderProperties.size() != 2) {
+              return failure("Incompatible encoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+            }
           }
-          if (1 != nonStreamVertexEdge.stream()
-            .map(e -> e.getPropertyValue(DecoderProperty.class).get().getClass()).distinct().count()) {
-            return failure("Incompatible decoders in " + Util.stringifyIREdgeIds(nonStreamVertexEdge));
+          Set<? extends Class<? extends DecoderFactory>> decoderProperties = nonStreamVertexEdge.stream().map(e
+            -> e.getPropertyValue(DecoderProperty.class).get().getClass()).collect(Collectors.toSet());
+          if (1 != nonStreamVertexEdge.stream().map(e -> e.getPropertyValue(DecoderProperty.class).get().getClass())
+            .distinct().count()) {
+            if (!decoderProperties.contains(DecoderFactory.DummyDecoderFactory.class)
+              || encoderProperties.size() != 2) {

Review comment:
       Ditto.

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java
##########
@@ -101,34 +122,41 @@ public PhysicalPlan rewrite(final PhysicalPlan currentPhysicalPlan, final int me
     // Optimize using the Message
     final Message message = new Message(messageId, examiningEdges, aggregatedData);
     final IRDAG newIRDAG = nemoOptimizer.optimizeAtRunTime(currentIRDAG, message);
+    this.setCurrentIRDAG(newIRDAG);
 
     // Re-compile the IRDAG into a physical plan
     final PhysicalPlan newPhysicalPlan = nemoBackend.compile(newIRDAG);
 
     // Update the physical plan and return
     final List<Stage> currentStages = currentPhysicalPlan.getStageDAG().getTopologicalSort();
     final List<Stage> newStages = newPhysicalPlan.getStageDAG().getTopologicalSort();
-    for (int i = 0; i < currentStages.size(); i++) {
+    IntStream.range(0, currentStages.size()).forEachOrdered(i -> {
       final ExecutionPropertyMap<VertexExecutionProperty> newProperties = newStages.get(i).getExecutionProperties();
       currentStages.get(i).setExecutionProperties(newProperties);
-    }
+      newProperties.get(ParallelismProperty.class).ifPresent(newParallelism -> {
+        currentStages.get(i).getTaskIndices().clear();
+        currentStages.get(i).getTaskIndices().addAll(IntStream.range(0, newParallelism).boxed()
+          .collect(Collectors.toList()));
+        IntStream.range(currentStages.get(i).getVertexIdToReadables().size(), newParallelism).forEach(newIdx ->
+          currentStages.get(i).getVertexIdToReadables().add(new HashMap<>()));
+      });
+    });
     return currentPhysicalPlan;
   }
 
   @Override
-  public void accumulate(final int messageId, final Object data) {
+  public void accumulate(final int messageId, final Set<StageEdge> targetEdges, final Object data) {
+    final Prophet prophet;
+    final List<ControlMessage.RunTimePassMessageEntry> parsedData = (List<ControlMessage.RunTimePassMessageEntry>) data;
+    if (!parsedData.isEmpty() && parsedData.get(0).getKey().equals("NONE")) {

Review comment:
       Can you leave a short comment on this magic number 0?

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.

Review comment:
       It would be great if a brief description of the algorithm is provided. (Or references to related work)
   e.g., What is the benefit of Math.pow(2, i)?

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.
+ */
+public final class ParallelismProphet implements Prophet<String, Long> {
+  private static final Logger LOG = LoggerFactory.getLogger(ParallelismProphet.class.getName());
+  private final SimulationScheduler simulationScheduler;
+  private final PhysicalPlanGenerator physicalPlanGenerator;
+  private final IRDAG currentIRDAG;
+  private final PhysicalPlan currentPhysicalPlan;
+  private final Set<StageEdge> edgesToOptimize;
+  private final int partitionerProperty;
+
+  /**
+   * Default constructor for ParallelismProphet.
+   * @param irdag                   current IRDAG
+   * @param physicalPlan            current PhysicalPlan
+   * @param simulationScheduler     SimulationScheduler to launch
+   * @param physicalPlanGenerator   PhysicalPlanGenerator to make physical plan which will be launched by
+   *                                simulation scheduler
+   * @param edgesToOptimize         edges to optimize at runtime pass
+   */
+  public ParallelismProphet(final IRDAG irdag, final PhysicalPlan physicalPlan,
+                            final SimulationScheduler simulationScheduler,
+                            final PhysicalPlanGenerator physicalPlanGenerator,
+                            final Set<StageEdge> edgesToOptimize) {
+    this.currentIRDAG = irdag;
+    this.currentPhysicalPlan = physicalPlan;
+    this.simulationScheduler = simulationScheduler;
+    this.physicalPlanGenerator = physicalPlanGenerator;
+    this.edgesToOptimize = edgesToOptimize;
+    this.partitionerProperty = calculatePartitionerProperty(edgesToOptimize);
+  }
+
+  /**
+   * Launch SimulationScheduler and find out the optimal parallelism.
+   * @return  Map of one element, with key "opt.parallelism".
+   */
+  @Override
+  public Map<String, Long> calculate() {
+    final Map<String, Long> result = new HashMap<>();
+    final List<PhysicalPlan> listOfPhysicalPlans = new ArrayList<>(); // when to update here?
+    for (int i = 0; i < 7; i++) {
+      final int parallelism = (int) (partitionerProperty / Math.pow(2, i));
+      PhysicalPlan newPlan = makePhysicalPlanForSimulation(parallelism, edgesToOptimize, currentIRDAG);
+      listOfPhysicalPlans.add(newPlan);
+    }
+    final List<Pair<Integer, Long>> listOfParallelismToDurationPair = listOfPhysicalPlans.stream()
+      .map(this::launchSimulationForPlan)
+      .filter(pair -> pair.right() > 0.5)

Review comment:
       Can you leave a short comment on this magic number 0.5?

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.metric.JobMetric;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.master.metric.MetricStore;
+import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * A prophet for Parallelism.
+ */
+public final class ParallelismProphet implements Prophet<String, Long> {
+  private static final Logger LOG = LoggerFactory.getLogger(ParallelismProphet.class.getName());
+  private final SimulationScheduler simulationScheduler;
+  private final PhysicalPlanGenerator physicalPlanGenerator;
+  private final IRDAG currentIRDAG;
+  private final PhysicalPlan currentPhysicalPlan;
+  private final Set<StageEdge> edgesToOptimize;
+  private final int partitionerProperty;
+
+  /**
+   * Default constructor for ParallelismProphet.
+   * @param irdag                   current IRDAG
+   * @param physicalPlan            current PhysicalPlan
+   * @param simulationScheduler     SimulationScheduler to launch
+   * @param physicalPlanGenerator   PhysicalPlanGenerator to make physical plan which will be launched by
+   *                                simulation scheduler
+   * @param edgesToOptimize         edges to optimize at runtime pass
+   */
+  public ParallelismProphet(final IRDAG irdag, final PhysicalPlan physicalPlan,
+                            final SimulationScheduler simulationScheduler,
+                            final PhysicalPlanGenerator physicalPlanGenerator,
+                            final Set<StageEdge> edgesToOptimize) {
+    this.currentIRDAG = irdag;
+    this.currentPhysicalPlan = physicalPlan;
+    this.simulationScheduler = simulationScheduler;
+    this.physicalPlanGenerator = physicalPlanGenerator;
+    this.edgesToOptimize = edgesToOptimize;
+    this.partitionerProperty = calculatePartitionerProperty(edgesToOptimize);
+  }
+
+  /**
+   * Launch SimulationScheduler and find out the optimal parallelism.
+   * @return  Map of one element, with key "opt.parallelism".
+   */
+  @Override
+  public Map<String, Long> calculate() {
+    final Map<String, Long> result = new HashMap<>();
+    final List<PhysicalPlan> listOfPhysicalPlans = new ArrayList<>(); // when to update here?
+    for (int i = 0; i < 7; i++) {

Review comment:
       Can you leave a short comment on this magic number 7?

##########
File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/DynamicTaskSizingRuntimePass.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.runtime;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Runtime pass for Dynamic Task Sizing policy.
+ */
+public final class DynamicTaskSizingRuntimePass extends RunTimePass<Map<String, Long>> {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicTaskSizingRuntimePass.class.getName());
+  private final String mapKey = "opt.parallelism";
+
+  /**
+   * Default Constructor.
+   */
+  public DynamicTaskSizingRuntimePass() {
+  }
+
+  @Override
+  public IRDAG apply(final IRDAG irdag, final Message<Map<String, Long>> mapMessage) {
+    final Set<IREdge> edgesToOptimize = mapMessage.getExaminedEdges();
+    final Set<IRVertex> stageVertices = edgesToOptimize.stream().map(IREdge::getDst).collect(Collectors.toSet());
+    irdag.topologicalDo(v -> {
+      if (stageVertices.contains(v)) {
+        edgesToOptimize.addAll(irdag.getIncomingEdgesOf(v));
+      }
+    });
+    LOG.info("Examined edges {}", edgesToOptimize.stream().map(IREdge::getId).collect(Collectors.toList()));
+
+    final IREdge representativeEdge = edgesToOptimize.iterator().next();
+    // double check
+    if (!representativeEdge.getDst().getPropertyValue(EnableDynamicTaskSizingProperty.class).orElse(false)) {
+      return irdag;
+    }
+    final Map<String, Long> messageValue = mapMessage.getMessageValue();
+    LOG.info("messageValue {}", messageValue);
+    final int optimizedTaskSizeRatio = messageValue.get(mapKey).intValue();
+    final int partitionerProperty = getPartitionerProperty(irdag);
+    for (IREdge edge : edgesToOptimize) {
+      if (!edge.getPropertyValue(PartitionerProperty.class).get().right().equals(partitionerProperty)
+      && edge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.SHUFFLE)) {
+        throw new IllegalArgumentException();
+      }
+    }
+    final int partitionUnit = partitionerProperty / optimizedTaskSizeRatio;
+    edgesToOptimize.forEach(irEdge -> setSubPartitionProperty(irEdge, partitionUnit, partitionerProperty));
+    edgesToOptimize.forEach(irEdge -> setDstVertexParallelismProperty(irEdge, partitionUnit, partitionerProperty));
+    return irdag;
+  }
+
+  private int getPartitionerProperty(final IRDAG dag) {
+    long jobSizeInBytes = dag.getInputSize();

Review comment:
       jobSize => sourceInputDataSize?
   
   Job size seems to suggest sum(input data + all intermediate data)

##########
File path: examples/beam/src/test/java/org/apache/nemo/examples/beam/AlternatingLeastSquareITCase.java
##########
@@ -73,6 +74,15 @@ public void testDefault() throws Exception {
       .build());
   }
 
+  @Test(timeout = 2400000)
+  public void testDTS() throws Exception {

Review comment:
       Awesome! Thanks for adding this test.

##########
File path: runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java
##########
@@ -106,15 +106,19 @@ private long calculateExpectedTaskDuration(final Task task) {
       .filter(s -> s.get("properties").get("irDag").get("edges").size()
         == stageIRDAG.getEdges().size())  // same # of edges.
       .filter(s -> s.get("properties").get("executionProperties")
-        .get("org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty").asInt(0)
-        == task.getPropertyValue(ParallelismProperty.class).orElse(0))  // same parallelism.
+        .get("org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty").asInt(1)
+        == 32)  // sampling vertices have parallelism of 32
+      .filter(s -> s.get("properties").get("executionProperties")
+        .get("org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty").asBoolean())
       .map(s -> s.get("id").asText())
       .collect(Collectors.toSet());
 
+    final int simulationTaskParallelism = task.getPropertyValue(ParallelismProperty.class).orElse(1);

Review comment:
       Can you leave a short comment on this magic number 1?

##########
File path: runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulatedTaskExecutor.java
##########
@@ -106,15 +106,19 @@ private long calculateExpectedTaskDuration(final Task task) {
       .filter(s -> s.get("properties").get("irDag").get("edges").size()
         == stageIRDAG.getEdges().size())  // same # of edges.
       .filter(s -> s.get("properties").get("executionProperties")
-        .get("org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty").asInt(0)
-        == task.getPropertyValue(ParallelismProperty.class).orElse(0))  // same parallelism.
+        .get("org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty").asInt(1)
+        == 32)  // sampling vertices have parallelism of 32

Review comment:
       It would be even better if 32 is a global variable so that we don't forget to update this number when later we decide to change the parallelism of sampling vertex.

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java
##########
@@ -101,34 +122,41 @@ public PhysicalPlan rewrite(final PhysicalPlan currentPhysicalPlan, final int me
     // Optimize using the Message
     final Message message = new Message(messageId, examiningEdges, aggregatedData);
     final IRDAG newIRDAG = nemoOptimizer.optimizeAtRunTime(currentIRDAG, message);
+    this.setCurrentIRDAG(newIRDAG);
 
     // Re-compile the IRDAG into a physical plan
     final PhysicalPlan newPhysicalPlan = nemoBackend.compile(newIRDAG);
 
     // Update the physical plan and return
     final List<Stage> currentStages = currentPhysicalPlan.getStageDAG().getTopologicalSort();
     final List<Stage> newStages = newPhysicalPlan.getStageDAG().getTopologicalSort();
-    for (int i = 0; i < currentStages.size(); i++) {
+    IntStream.range(0, currentStages.size()).forEachOrdered(i -> {
       final ExecutionPropertyMap<VertexExecutionProperty> newProperties = newStages.get(i).getExecutionProperties();
       currentStages.get(i).setExecutionProperties(newProperties);
-    }
+      newProperties.get(ParallelismProperty.class).ifPresent(newParallelism -> {
+        currentStages.get(i).getTaskIndices().clear();
+        currentStages.get(i).getTaskIndices().addAll(IntStream.range(0, newParallelism).boxed()
+          .collect(Collectors.toList()));
+        IntStream.range(currentStages.get(i).getVertexIdToReadables().size(), newParallelism).forEach(newIdx ->
+          currentStages.get(i).getVertexIdToReadables().add(new HashMap<>()));
+      });
+    });
     return currentPhysicalPlan;
   }
 
   @Override
-  public void accumulate(final int messageId, final Object data) {
+  public void accumulate(final int messageId, final Set<StageEdge> targetEdges, final Object data) {
+    final Prophet prophet;
+    final List<ControlMessage.RunTimePassMessageEntry> parsedData = (List<ControlMessage.RunTimePassMessageEntry>) data;
+    if (!parsedData.isEmpty() && parsedData.get(0).getKey().equals("NONE")) {

Review comment:
       It would be better if "NONE" is a public/global/static variable, rather than a local string




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org