You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2018/08/20 04:37:36 UTC

[incubator-nemo] branch master updated: [NEMO-89] Clean up IRVertex#getClone() (#109)

This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new f1283ff  [NEMO-89] Clean up IRVertex#getClone() (#109)
f1283ff is described below

commit f1283fff6c6baabd38b487eac460a11e973aa3aa
Author: Arun Lakshman R <26...@users.noreply.github.com>
AuthorDate: Mon Aug 20 10:07:34 2018 +0530

    [NEMO-89] Clean up IRVertex#getClone() (#109)
    
     JIRA: [NEMO-89: Clean up IRVertex#getClone()](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-89)
    
    **Major changes:**
    - `getClone()` has been moved into a Interface
     - Copy constructors has been added to the subclasses of the `IRVertex` which does the deep copying of the objects
    
    **Minor changes to note:**
    - NA
    
    **Tests for the changes:**
    - NA
    
    **Other comments:**
    - NA
    
    resolves NEMO-89 #resolve
---
 .../main/java/edu/snu/nemo/common/Cloneable.java   | 41 +++++++++++++
 .../edu/snu/nemo/common/ir/vertex/IRVertex.java    | 16 ++++-
 .../common/ir/vertex/InMemorySourceVertex.java     | 18 ++++--
 .../edu/snu/nemo/common/ir/vertex/LoopVertex.java  | 71 +++++++++++-----------
 .../ir/vertex/MetricCollectionBarrierVertex.java   | 20 ++++--
 .../snu/nemo/common/ir/vertex/OperatorVertex.java  | 13 +++-
 .../snu/nemo/common/ir/vertex/SourceVertex.java    | 15 +++++
 .../edu/snu/nemo/common/test/EmptyComponents.java  | 11 +++-
 .../beam/source/BeamBoundedSourceVertex.java       | 17 +++++-
 .../source/SparkDatasetBoundedSourceVertex.java    | 15 ++---
 .../source/SparkTextFileBoundedSourceVertex.java   | 13 ++--
 11 files changed, 185 insertions(+), 65 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/Cloneable.java b/common/src/main/java/edu/snu/nemo/common/Cloneable.java
new file mode 100644
index 0000000..4a75b7d
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/Cloneable.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+/**
+ * This interface is implemented by objects that can be cloned.
+ * <p>
+ * This interface also overcomes the drawback of {@link java.lang.Cloneable} interface which
+ * doesn't have a clone method.
+ * Josh Bloch (a JDK author) has pointed out this in his book "Effective Java" as
+ * <a href="http://thefinestartist.com/effective-java/11"> Override clone judiciously </a>
+ * </p>
+ *
+ * @param <T> the type of objects that this class can clone
+ */
+public interface Cloneable<T extends Cloneable<T>> {
+
+  /**
+   * Creates and returns a copy of this object.
+   * <p>
+   * The precise meaning of "copy" may depend on the class of the object.
+   * The general intent is that, all fields of the object are copied.
+   * </p>
+   *
+   * @return a clone of this object.
+   */
+  T getClone();
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index 09ebfbf..5810233 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.dag.Vertex;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.Cloneable;
 
 import java.io.Serializable;
 import java.util.Optional;
@@ -27,7 +28,7 @@ import java.util.Optional;
  * The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo.
  * An IRVertex is created and modified in the compiler, and executed in the runtime.
  */
-public abstract class IRVertex extends Vertex {
+public abstract class IRVertex extends Vertex implements Cloneable<IRVertex> {
   private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
   private boolean stagePartitioned;
 
@@ -41,9 +42,16 @@ public abstract class IRVertex extends Vertex {
   }
 
   /**
-   * @return a clone elemnt of the IRVertex.
+   * Copy Constructor for IRVertex.
+   *
+   * @param that the source object for copying
    */
-  public abstract IRVertex getClone();
+  public IRVertex(final IRVertex that) {
+    super(IdManager.newVertexId());
+    this.executionProperties = ExecutionPropertyMap.of(this);
+    that.getExecutionProperties().forEachProperties(this::setProperty);
+    this.stagePartitioned = that.stagePartitioned;
+  }
 
   /**
    * Static function to copy executionProperties from a vertex to the other.
@@ -55,6 +63,7 @@ public abstract class IRVertex extends Vertex {
 
   /**
    * Set an executionProperty of the IRVertex.
+   *
    * @param executionProperty new execution property.
    * @return the IRVertex with the execution property set.
    */
@@ -65,6 +74,7 @@ public abstract class IRVertex extends Vertex {
 
   /**
    * Set an executionProperty of the IRVertex, permanently.
+   *
    * @param executionProperty new execution property.
    * @return the IRVertex with the execution property set.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
index c73475b..99a4183 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -29,18 +29,28 @@ public final class InMemorySourceVertex<T> extends SourceVertex<T> {
   private Iterable<T> initializedSourceData;
 
   /**
-   * Constructor.
+   * Constructor for InMemorySourceVertex.
+   *
    * @param initializedSourceData the initial data object.
    */
   public InMemorySourceVertex(final Iterable<T> initializedSourceData) {
+    super();
     this.initializedSourceData = initializedSourceData;
   }
 
+  /**
+   * Copy Constructor for InMemorySourceVertex.
+   *
+   * @param that the source object for copying
+   */
+  public InMemorySourceVertex(final InMemorySourceVertex that) {
+    super(that);
+    this.initializedSourceData = that.initializedSourceData;
+  }
+
   @Override
   public InMemorySourceVertex<T> getClone() {
-    final InMemorySourceVertex<T> that = new InMemorySourceVertex<>(this.initializedSourceData);
-    this.copyExecutionPropertiesTo(that);
-    return that;
+    return new InMemorySourceVertex<>(this);
   }
 
   @Override
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
index 96c7fca..7d25aee 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
@@ -31,59 +31,62 @@ import java.util.stream.Collectors;
  * IRVertex that contains a partial DAG that is iterative.
  */
 public final class LoopVertex extends IRVertex {
+
   private static int duplicateEdgeGroupId = 0;
-  private final DAGBuilder<IRVertex, IREdge> builder; // Contains DAG information
+  // Contains DAG information
+  private final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
   private final String compositeTransformFullName;
-
-  private final Map<IRVertex, Set<IREdge>> dagIncomingEdges; // for the initial iteration
-  private final Map<IRVertex, Set<IREdge>> iterativeIncomingEdges; // Edges from previous iterations connected internal.
-  private final Map<IRVertex, Set<IREdge>> nonIterativeIncomingEdges; // Edges from outside previous iterations.
-  private final Map<IRVertex, Set<IREdge>> dagOutgoingEdges; // for the final iteration
-  private final Map<IREdge, IREdge> edgeWithLoopToEdgeWithInternalVertex;
-  private final Map<IREdge, IREdge> edgeWithInternalVertexToEdgeWithLoop;
-
+  // for the initial iteration
+  private final Map<IRVertex, Set<IREdge>> dagIncomingEdges = new HashMap<>();
+  // Edges from previous iterations connected internal.
+  private final Map<IRVertex, Set<IREdge>> iterativeIncomingEdges = new HashMap<>();
+  // Edges from outside previous iterations.
+  private final Map<IRVertex, Set<IREdge>> nonIterativeIncomingEdges = new HashMap<>();
+  // for the final iteration
+  private final Map<IRVertex, Set<IREdge>> dagOutgoingEdges = new HashMap<>();
+  private final Map<IREdge, IREdge> edgeWithLoopToEdgeWithInternalVertex = new HashMap<>();
+  private final Map<IREdge, IREdge> edgeWithInternalVertexToEdgeWithLoop = new HashMap<>();
   private Integer maxNumberOfIterations;
   private IntPredicate terminationCondition;
 
   /**
    * The LoopVertex constructor.
+   *
    * @param compositeTransformFullName full name of the composite transform.
    */
   public LoopVertex(final String compositeTransformFullName) {
     super();
-    this.builder = new DAGBuilder<>();
     this.compositeTransformFullName = compositeTransformFullName;
-    this.dagIncomingEdges = new HashMap<>();
-    this.iterativeIncomingEdges = new HashMap<>();
-    this.nonIterativeIncomingEdges = new HashMap<>();
-    this.dagOutgoingEdges = new HashMap<>();
-    this.edgeWithLoopToEdgeWithInternalVertex = new HashMap<>();
-    this.edgeWithInternalVertexToEdgeWithLoop = new HashMap<>();
     this.maxNumberOfIterations = 1; // 1 is the default number of iterations.
     this.terminationCondition = (IntPredicate & Serializable) (integer -> false); // nothing much yet.
   }
 
-  @Override
-  public LoopVertex getClone() {
-    final LoopVertex newLoopVertex = new LoopVertex(compositeTransformFullName);
-
+  /**
+   * Copy Constructor for LoopVertex.
+   *
+   * @param that the source object for copying
+   */
+  public LoopVertex(final LoopVertex that) {
+    super(that);
+    this.compositeTransformFullName = new String(that.compositeTransformFullName);
     // Copy all elements to the clone
-    final DAG<IRVertex, IREdge> dagToCopy = this.getDAG();
+    final DAG<IRVertex, IREdge> dagToCopy = that.getDAG();
     dagToCopy.topologicalDo(v -> {
-      newLoopVertex.getBuilder().addVertex(v, dagToCopy);
-      dagToCopy.getIncomingEdgesOf(v).forEach(newLoopVertex.getBuilder()::connectVertices);
+      this.getBuilder().addVertex(v, dagToCopy);
+      dagToCopy.getIncomingEdgesOf(v).forEach(this.getBuilder()::connectVertices);
     });
-    this.dagIncomingEdges.forEach(((v, es) -> es.forEach(newLoopVertex::addDagIncomingEdge)));
-    this.iterativeIncomingEdges.forEach((v, es) -> es.forEach(newLoopVertex::addIterativeIncomingEdge));
-    this.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(newLoopVertex::addNonIterativeIncomingEdge));
-    this.dagOutgoingEdges.forEach(((v, es) -> es.forEach(newLoopVertex::addDagOutgoingEdge)));
-    this.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal)
-        -> newLoopVertex.mapEdgeWithLoop(eLoop, eInternal));
-    newLoopVertex.setMaxNumberOfIterations(maxNumberOfIterations);
-    newLoopVertex.setTerminationCondition(terminationCondition);
-
-    this.copyExecutionPropertiesTo(newLoopVertex);
-    return newLoopVertex;
+    that.dagIncomingEdges.forEach(((v, es) -> es.forEach(this::addDagIncomingEdge)));
+    that.iterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addIterativeIncomingEdge));
+    that.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addNonIterativeIncomingEdge));
+    that.dagOutgoingEdges.forEach(((v, es) -> es.forEach(this::addDagOutgoingEdge)));
+    that.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) -> this.mapEdgeWithLoop(eLoop, eInternal));
+    this.maxNumberOfIterations = that.maxNumberOfIterations;
+    this.terminationCondition = that.terminationCondition;
+  }
+
+  @Override
+  public LoopVertex getClone() {
+    return new LoopVertex(this);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
index 5a2ad41..a4a03bf 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
@@ -41,17 +41,29 @@ public final class MetricCollectionBarrierVertex<K, V> extends IRVertex {
    * Constructor for dynamic optimization vertex.
    */
   public MetricCollectionBarrierVertex() {
+    super();
     this.metricData = new HashMap<>();
     this.blockIds = new ArrayList<>();
     this.dagSnapshot = null;
   }
 
+  /**
+   * Constructor for dynamic optimization vertex.
+   *
+   * @param that the source object for copying
+   */
+  public MetricCollectionBarrierVertex(final MetricCollectionBarrierVertex<K, V> that) {
+    super(that);
+    this.metricData = new HashMap<>();
+    that.metricData.forEach(this.metricData::put);
+    this.blockIds = new ArrayList<>();
+    that.blockIds.forEach(this.blockIds::add);
+    this.dagSnapshot = that.dagSnapshot;
+  }
+
   @Override
   public MetricCollectionBarrierVertex getClone() {
-    final MetricCollectionBarrierVertex that = new MetricCollectionBarrierVertex();
-    that.setDAGSnapshot(dagSnapshot);
-    this.copyExecutionPropertiesTo(that);
-    return that;
+    return new MetricCollectionBarrierVertex(this);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java
index 2d39736..636b8e9 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java
@@ -33,11 +33,18 @@ public final class OperatorVertex extends IRVertex {
     this.transform = t;
   }
 
+  /**
+   * Copy Constructor of OperatorVertex.
+   * @param that the source object for copying
+   */
+  public OperatorVertex(final OperatorVertex that) {
+    super();
+    this.transform = that.transform;
+  }
+
   @Override
   public OperatorVertex getClone() {
-    final OperatorVertex that = new OperatorVertex(this.transform);
-    this.copyExecutionPropertiesTo(that);
-    return that;
+    return new OperatorVertex(this);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
index f64e170..f1472d8 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
@@ -27,6 +27,21 @@ import java.util.List;
 public abstract class SourceVertex<O> extends IRVertex {
 
   /**
+   * Constructor for SourceVertex.
+   */
+  public SourceVertex() {
+    super();
+  }
+
+  /**
+   * Copy Constructor for SourceVertex.
+   *
+   * @param that the source object for copying
+   */
+  public SourceVertex(final SourceVertex that) {
+    super(that);
+  }
+  /**
    * Gets parallel readables.
    *
    * @param desiredNumOfSplits number of splits desired.
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index 88a768a..a1daf8b 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -113,6 +113,15 @@ public final class EmptyComponents {
       this.name = name;
     }
 
+    /**
+     * Copy Constructor for EmptySourceVertex.
+     *
+     * @param that the source object for copying
+     */
+    public EmptySourceVertex(final EmptySourceVertex that) {
+      this.name = new String(that.name);
+    }
+
     @Override
     public String toString() {
       final StringBuilder sb = new StringBuilder();
@@ -137,7 +146,7 @@ public final class EmptyComponents {
 
     @Override
     public EmptySourceVertex<T> getClone() {
-      return new EmptySourceVertex<>(this.name);
+      return new EmptySourceVertex<>(this);
     }
   }
 
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 71fc5da..550efeb 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -41,18 +41,29 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
 
   /**
    * Constructor of BeamBoundedSourceVertex.
+   *
    * @param source BoundedSource to read from.
    */
   public BeamBoundedSourceVertex(final BoundedSource<O> source) {
+    super();
     this.source = source;
     this.sourceDescription = source.toString();
   }
 
+  /**
+   * Constructor of BeamBoundedSourceVertex.
+   *
+   * @param that the source object for copying
+   */
+  public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) {
+    super(that);
+    this.source = that.source;
+    this.sourceDescription = that.source.toString();
+  }
+
   @Override
   public BeamBoundedSourceVertex getClone() {
-    final BeamBoundedSourceVertex that = new BeamBoundedSourceVertex<>(this.source);
-    this.copyExecutionPropertiesTo(that);
-    return that;
+    return new BeamBoundedSourceVertex(this);
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 3b05807..925beb9 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -41,6 +41,7 @@ public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
    * @param dataset      Dataset to read data from.
    */
   public SparkDatasetBoundedSourceVertex(final SparkSession sparkSession, final Dataset<T> dataset) {
+    super();
     this.readables = new ArrayList<>();
     final RDD rdd = dataset.sparkRDD();
     final Partition[] partitions = rdd.getPartitions();
@@ -54,19 +55,19 @@ public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
   }
 
   /**
-   * Constructor for cloning.
+   * Copy Constructor for SparkDatasetBoundedSourceVertex.
    *
-   * @param readables the list of Readables to set.
+   * @param that the source object for copying
    */
-  private SparkDatasetBoundedSourceVertex(final List<Readable<T>> readables) {
-    this.readables = readables;
+  public SparkDatasetBoundedSourceVertex(final SparkDatasetBoundedSourceVertex<T> that) {
+    super(that);
+    this.readables = new ArrayList<>();
+    that.readables.forEach(this.readables::add);
   }
 
   @Override
   public SparkDatasetBoundedSourceVertex getClone() {
-    final SparkDatasetBoundedSourceVertex<T> that = new SparkDatasetBoundedSourceVertex<>((this.readables));
-    this.copyExecutionPropertiesTo(that);
-    return that;
+    return new SparkDatasetBoundedSourceVertex(this);
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index 9b2fd38..9576024 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -40,6 +40,7 @@ public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String>
   public SparkTextFileBoundedSourceVertex(final SparkContext sparkContext,
                                           final String inputPath,
                                           final int numPartitions) {
+    super();
     this.readables = new ArrayList<>();
     final Partition[] partitions = sparkContext.textFile(inputPath, numPartitions).getPartitions();
     for (int i = 0; i < partitions.length; i++) {
@@ -55,17 +56,17 @@ public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String>
   /**
    * Constructor for cloning.
    *
-   * @param readables the list of Readables to set.
+   * @param that the source object for copying
    */
-  private SparkTextFileBoundedSourceVertex(final List<Readable<String>> readables) {
-    this.readables = readables;
+  private SparkTextFileBoundedSourceVertex(final SparkTextFileBoundedSourceVertex that) {
+    super(that);
+    this.readables = new ArrayList<>();
+    that.readables.forEach(this.readables::add);
   }
 
   @Override
   public SparkTextFileBoundedSourceVertex getClone() {
-    final SparkTextFileBoundedSourceVertex that = new SparkTextFileBoundedSourceVertex(this.readables);
-    this.copyExecutionPropertiesTo(that);
-    return that;
+    return new SparkTextFileBoundedSourceVertex(this);
   }
 
   @Override