You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2018/08/20 04:37:36 UTC
[incubator-nemo] branch master updated: [NEMO-89] Clean up
IRVertex#getClone() (#109)
This is an automated email from the ASF dual-hosted git repository.
wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new f1283ff [NEMO-89] Clean up IRVertex#getClone() (#109)
f1283ff is described below
commit f1283fff6c6baabd38b487eac460a11e973aa3aa
Author: Arun Lakshman R <26...@users.noreply.github.com>
AuthorDate: Mon Aug 20 10:07:34 2018 +0530
[NEMO-89] Clean up IRVertex#getClone() (#109)
JIRA: [NEMO-89: Clean up IRVertex#getClone()](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-89)
**Major changes:**
- `getClone()` has been moved into a Interface
- Copy constructors has been added to the subclasses of the `IRVertex` which does the deep copying of the objects
**Minor changes to note:**
- NA
**Tests for the changes:**
- NA
**Other comments:**
- NA
resolves NEMO-89 #resolve
---
.../main/java/edu/snu/nemo/common/Cloneable.java | 41 +++++++++++++
.../edu/snu/nemo/common/ir/vertex/IRVertex.java | 16 ++++-
.../common/ir/vertex/InMemorySourceVertex.java | 18 ++++--
.../edu/snu/nemo/common/ir/vertex/LoopVertex.java | 71 +++++++++++-----------
.../ir/vertex/MetricCollectionBarrierVertex.java | 20 ++++--
.../snu/nemo/common/ir/vertex/OperatorVertex.java | 13 +++-
.../snu/nemo/common/ir/vertex/SourceVertex.java | 15 +++++
.../edu/snu/nemo/common/test/EmptyComponents.java | 11 +++-
.../beam/source/BeamBoundedSourceVertex.java | 17 +++++-
.../source/SparkDatasetBoundedSourceVertex.java | 15 ++---
.../source/SparkTextFileBoundedSourceVertex.java | 13 ++--
11 files changed, 185 insertions(+), 65 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/Cloneable.java b/common/src/main/java/edu/snu/nemo/common/Cloneable.java
new file mode 100644
index 0000000..4a75b7d
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/Cloneable.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+/**
+ * This interface is implemented by objects that can be cloned.
+ * <p>
+ * This interface also overcomes the drawback of {@link java.lang.Cloneable} interface which
+ * doesn't have a clone method.
+ * Josh Bloch (a JDK author) has pointed out this in his book "Effective Java" as
+ * <a href="http://thefinestartist.com/effective-java/11"> Override clone judiciously </a>
+ * </p>
+ *
+ * @param <T> the type of objects that this class can clone
+ */
+public interface Cloneable<T extends Cloneable<T>> {
+
+ /**
+ * Creates and returns a copy of this object.
+ * <p>
+ * The precise meaning of "copy" may depend on the class of the object.
+ * The general intent is that, all fields of the object are copied.
+ * </p>
+ *
+ * @return a clone of this object.
+ */
+ T getClone();
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index 09ebfbf..5810233 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.common.ir.IdManager;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.dag.Vertex;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import edu.snu.nemo.common.Cloneable;
import java.io.Serializable;
import java.util.Optional;
@@ -27,7 +28,7 @@ import java.util.Optional;
* The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo.
* An IRVertex is created and modified in the compiler, and executed in the runtime.
*/
-public abstract class IRVertex extends Vertex {
+public abstract class IRVertex extends Vertex implements Cloneable<IRVertex> {
private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
private boolean stagePartitioned;
@@ -41,9 +42,16 @@ public abstract class IRVertex extends Vertex {
}
/**
- * @return a clone elemnt of the IRVertex.
+ * Copy Constructor for IRVertex.
+ *
+ * @param that the source object for copying
*/
- public abstract IRVertex getClone();
+ public IRVertex(final IRVertex that) {
+ super(IdManager.newVertexId());
+ this.executionProperties = ExecutionPropertyMap.of(this);
+ that.getExecutionProperties().forEachProperties(this::setProperty);
+ this.stagePartitioned = that.stagePartitioned;
+ }
/**
* Static function to copy executionProperties from a vertex to the other.
@@ -55,6 +63,7 @@ public abstract class IRVertex extends Vertex {
/**
* Set an executionProperty of the IRVertex.
+ *
* @param executionProperty new execution property.
* @return the IRVertex with the execution property set.
*/
@@ -65,6 +74,7 @@ public abstract class IRVertex extends Vertex {
/**
* Set an executionProperty of the IRVertex, permanently.
+ *
* @param executionProperty new execution property.
* @return the IRVertex with the execution property set.
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
index c73475b..99a4183 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -29,18 +29,28 @@ public final class InMemorySourceVertex<T> extends SourceVertex<T> {
private Iterable<T> initializedSourceData;
/**
- * Constructor.
+ * Constructor for InMemorySourceVertex.
+ *
* @param initializedSourceData the initial data object.
*/
public InMemorySourceVertex(final Iterable<T> initializedSourceData) {
+ super();
this.initializedSourceData = initializedSourceData;
}
+ /**
+ * Copy Constructor for InMemorySourceVertex.
+ *
+ * @param that the source object for copying
+ */
+ public InMemorySourceVertex(final InMemorySourceVertex that) {
+ super(that);
+ this.initializedSourceData = that.initializedSourceData;
+ }
+
@Override
public InMemorySourceVertex<T> getClone() {
- final InMemorySourceVertex<T> that = new InMemorySourceVertex<>(this.initializedSourceData);
- this.copyExecutionPropertiesTo(that);
- return that;
+ return new InMemorySourceVertex<>(this);
}
@Override
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
index 96c7fca..7d25aee 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
@@ -31,59 +31,62 @@ import java.util.stream.Collectors;
* IRVertex that contains a partial DAG that is iterative.
*/
public final class LoopVertex extends IRVertex {
+
private static int duplicateEdgeGroupId = 0;
- private final DAGBuilder<IRVertex, IREdge> builder; // Contains DAG information
+ // Contains DAG information
+ private final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
private final String compositeTransformFullName;
-
- private final Map<IRVertex, Set<IREdge>> dagIncomingEdges; // for the initial iteration
- private final Map<IRVertex, Set<IREdge>> iterativeIncomingEdges; // Edges from previous iterations connected internal.
- private final Map<IRVertex, Set<IREdge>> nonIterativeIncomingEdges; // Edges from outside previous iterations.
- private final Map<IRVertex, Set<IREdge>> dagOutgoingEdges; // for the final iteration
- private final Map<IREdge, IREdge> edgeWithLoopToEdgeWithInternalVertex;
- private final Map<IREdge, IREdge> edgeWithInternalVertexToEdgeWithLoop;
-
+ // for the initial iteration
+ private final Map<IRVertex, Set<IREdge>> dagIncomingEdges = new HashMap<>();
+ // Edges from previous iterations connected internal.
+ private final Map<IRVertex, Set<IREdge>> iterativeIncomingEdges = new HashMap<>();
+ // Edges from outside previous iterations.
+ private final Map<IRVertex, Set<IREdge>> nonIterativeIncomingEdges = new HashMap<>();
+ // for the final iteration
+ private final Map<IRVertex, Set<IREdge>> dagOutgoingEdges = new HashMap<>();
+ private final Map<IREdge, IREdge> edgeWithLoopToEdgeWithInternalVertex = new HashMap<>();
+ private final Map<IREdge, IREdge> edgeWithInternalVertexToEdgeWithLoop = new HashMap<>();
private Integer maxNumberOfIterations;
private IntPredicate terminationCondition;
/**
* The LoopVertex constructor.
+ *
* @param compositeTransformFullName full name of the composite transform.
*/
public LoopVertex(final String compositeTransformFullName) {
super();
- this.builder = new DAGBuilder<>();
this.compositeTransformFullName = compositeTransformFullName;
- this.dagIncomingEdges = new HashMap<>();
- this.iterativeIncomingEdges = new HashMap<>();
- this.nonIterativeIncomingEdges = new HashMap<>();
- this.dagOutgoingEdges = new HashMap<>();
- this.edgeWithLoopToEdgeWithInternalVertex = new HashMap<>();
- this.edgeWithInternalVertexToEdgeWithLoop = new HashMap<>();
this.maxNumberOfIterations = 1; // 1 is the default number of iterations.
this.terminationCondition = (IntPredicate & Serializable) (integer -> false); // nothing much yet.
}
- @Override
- public LoopVertex getClone() {
- final LoopVertex newLoopVertex = new LoopVertex(compositeTransformFullName);
-
+ /**
+ * Copy Constructor for LoopVertex.
+ *
+ * @param that the source object for copying
+ */
+ public LoopVertex(final LoopVertex that) {
+ super(that);
+ this.compositeTransformFullName = new String(that.compositeTransformFullName);
// Copy all elements to the clone
- final DAG<IRVertex, IREdge> dagToCopy = this.getDAG();
+ final DAG<IRVertex, IREdge> dagToCopy = that.getDAG();
dagToCopy.topologicalDo(v -> {
- newLoopVertex.getBuilder().addVertex(v, dagToCopy);
- dagToCopy.getIncomingEdgesOf(v).forEach(newLoopVertex.getBuilder()::connectVertices);
+ this.getBuilder().addVertex(v, dagToCopy);
+ dagToCopy.getIncomingEdgesOf(v).forEach(this.getBuilder()::connectVertices);
});
- this.dagIncomingEdges.forEach(((v, es) -> es.forEach(newLoopVertex::addDagIncomingEdge)));
- this.iterativeIncomingEdges.forEach((v, es) -> es.forEach(newLoopVertex::addIterativeIncomingEdge));
- this.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(newLoopVertex::addNonIterativeIncomingEdge));
- this.dagOutgoingEdges.forEach(((v, es) -> es.forEach(newLoopVertex::addDagOutgoingEdge)));
- this.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal)
- -> newLoopVertex.mapEdgeWithLoop(eLoop, eInternal));
- newLoopVertex.setMaxNumberOfIterations(maxNumberOfIterations);
- newLoopVertex.setTerminationCondition(terminationCondition);
-
- this.copyExecutionPropertiesTo(newLoopVertex);
- return newLoopVertex;
+ that.dagIncomingEdges.forEach(((v, es) -> es.forEach(this::addDagIncomingEdge)));
+ that.iterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addIterativeIncomingEdge));
+ that.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addNonIterativeIncomingEdge));
+ that.dagOutgoingEdges.forEach(((v, es) -> es.forEach(this::addDagOutgoingEdge)));
+ that.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) -> this.mapEdgeWithLoop(eLoop, eInternal));
+ this.maxNumberOfIterations = that.maxNumberOfIterations;
+ this.terminationCondition = that.terminationCondition;
+ }
+
+ @Override
+ public LoopVertex getClone() {
+ return new LoopVertex(this);
}
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
index 5a2ad41..a4a03bf 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
@@ -41,17 +41,29 @@ public final class MetricCollectionBarrierVertex<K, V> extends IRVertex {
* Constructor for dynamic optimization vertex.
*/
public MetricCollectionBarrierVertex() {
+ super();
this.metricData = new HashMap<>();
this.blockIds = new ArrayList<>();
this.dagSnapshot = null;
}
+ /**
+ * Constructor for dynamic optimization vertex.
+ *
+ * @param that the source object for copying
+ */
+ public MetricCollectionBarrierVertex(final MetricCollectionBarrierVertex<K, V> that) {
+ super(that);
+ this.metricData = new HashMap<>();
+ that.metricData.forEach(this.metricData::put);
+ this.blockIds = new ArrayList<>();
+ that.blockIds.forEach(this.blockIds::add);
+ this.dagSnapshot = that.dagSnapshot;
+ }
+
@Override
public MetricCollectionBarrierVertex getClone() {
- final MetricCollectionBarrierVertex that = new MetricCollectionBarrierVertex();
- that.setDAGSnapshot(dagSnapshot);
- this.copyExecutionPropertiesTo(that);
- return that;
+ return new MetricCollectionBarrierVertex(this);
}
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java
index 2d39736..636b8e9 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/OperatorVertex.java
@@ -33,11 +33,18 @@ public final class OperatorVertex extends IRVertex {
this.transform = t;
}
+ /**
+ * Copy Constructor of OperatorVertex.
+ * @param that the source object for copying
+ */
+ public OperatorVertex(final OperatorVertex that) {
+ super();
+ this.transform = that.transform;
+ }
+
@Override
public OperatorVertex getClone() {
- final OperatorVertex that = new OperatorVertex(this.transform);
- this.copyExecutionPropertiesTo(that);
- return that;
+ return new OperatorVertex(this);
}
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
index f64e170..f1472d8 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
@@ -27,6 +27,21 @@ import java.util.List;
public abstract class SourceVertex<O> extends IRVertex {
/**
+ * Constructor for SourceVertex.
+ */
+ public SourceVertex() {
+ super();
+ }
+
+ /**
+ * Copy Constructor for SourceVertex.
+ *
+ * @param that the source object for copying
+ */
+ public SourceVertex(final SourceVertex that) {
+ super(that);
+ }
+ /**
* Gets parallel readables.
*
* @param desiredNumOfSplits number of splits desired.
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index 88a768a..a1daf8b 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -113,6 +113,15 @@ public final class EmptyComponents {
this.name = name;
}
+ /**
+ * Copy Constructor for EmptySourceVertex.
+ *
+ * @param that the source object for copying
+ */
+ public EmptySourceVertex(final EmptySourceVertex that) {
+ this.name = new String(that.name);
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -137,7 +146,7 @@ public final class EmptyComponents {
@Override
public EmptySourceVertex<T> getClone() {
- return new EmptySourceVertex<>(this.name);
+ return new EmptySourceVertex<>(this);
}
}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 71fc5da..550efeb 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -41,18 +41,29 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
/**
* Constructor of BeamBoundedSourceVertex.
+ *
* @param source BoundedSource to read from.
*/
public BeamBoundedSourceVertex(final BoundedSource<O> source) {
+ super();
this.source = source;
this.sourceDescription = source.toString();
}
+ /**
+ * Constructor of BeamBoundedSourceVertex.
+ *
+ * @param that the source object for copying
+ */
+ public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) {
+ super(that);
+ this.source = that.source;
+ this.sourceDescription = that.source.toString();
+ }
+
@Override
public BeamBoundedSourceVertex getClone() {
- final BeamBoundedSourceVertex that = new BeamBoundedSourceVertex<>(this.source);
- this.copyExecutionPropertiesTo(that);
- return that;
+ return new BeamBoundedSourceVertex(this);
}
@Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 3b05807..925beb9 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -41,6 +41,7 @@ public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
* @param dataset Dataset to read data from.
*/
public SparkDatasetBoundedSourceVertex(final SparkSession sparkSession, final Dataset<T> dataset) {
+ super();
this.readables = new ArrayList<>();
final RDD rdd = dataset.sparkRDD();
final Partition[] partitions = rdd.getPartitions();
@@ -54,19 +55,19 @@ public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
}
/**
- * Constructor for cloning.
+ * Copy Constructor for SparkDatasetBoundedSourceVertex.
*
- * @param readables the list of Readables to set.
+ * @param that the source object for copying
*/
- private SparkDatasetBoundedSourceVertex(final List<Readable<T>> readables) {
- this.readables = readables;
+ public SparkDatasetBoundedSourceVertex(final SparkDatasetBoundedSourceVertex<T> that) {
+ super(that);
+ this.readables = new ArrayList<>();
+ that.readables.forEach(this.readables::add);
}
@Override
public SparkDatasetBoundedSourceVertex getClone() {
- final SparkDatasetBoundedSourceVertex<T> that = new SparkDatasetBoundedSourceVertex<>((this.readables));
- this.copyExecutionPropertiesTo(that);
- return that;
+ return new SparkDatasetBoundedSourceVertex(this);
}
@Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index 9b2fd38..9576024 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -40,6 +40,7 @@ public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String>
public SparkTextFileBoundedSourceVertex(final SparkContext sparkContext,
final String inputPath,
final int numPartitions) {
+ super();
this.readables = new ArrayList<>();
final Partition[] partitions = sparkContext.textFile(inputPath, numPartitions).getPartitions();
for (int i = 0; i < partitions.length; i++) {
@@ -55,17 +56,17 @@ public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String>
/**
* Constructor for cloning.
*
- * @param readables the list of Readables to set.
+ * @param that the source object for copying
*/
- private SparkTextFileBoundedSourceVertex(final List<Readable<String>> readables) {
- this.readables = readables;
+ private SparkTextFileBoundedSourceVertex(final SparkTextFileBoundedSourceVertex that) {
+ super(that);
+ this.readables = new ArrayList<>();
+ that.readables.forEach(this.readables::add);
}
@Override
public SparkTextFileBoundedSourceVertex getClone() {
- final SparkTextFileBoundedSourceVertex that = new SparkTextFileBoundedSourceVertex(this.readables);
- this.copyExecutionPropertiesTo(that);
- return that;
+ return new SparkTextFileBoundedSourceVertex(this);
}
@Override