You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 08:50:39 UTC

[incubator-nemo] branch reshaping updated: relay to stream

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new cd38af6  relay to stream
cd38af6 is described below

commit cd38af68729357098259d0984e54b96b0e76f469
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 17:50:29 2019 +0900

    relay to stream
---
 .../nemo/common/ir/vertex/system/StreamVertex.java |  4 +--
 .../{RelayTransform.java => StreamTransform.java}  |  8 +++---
 .../DedicatedKeyPerElementPartitioner.java         |  4 ++-
 .../runtime/executor/task/TaskExecutorTest.java    | 32 +++++++++++-----------
 4 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java
index d9884bf..4f4a9ca 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.common.ir.vertex.system;
 
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.transform.RelayTransform;
+import org.apache.nemo.common.ir.vertex.transform.StreamTransform;
 
 /**
  * IRVertex that transforms input data.
@@ -30,6 +30,6 @@ public final class StreamVertex extends OperatorVertex {
    * Constructor.
    */
   public StreamVertex() {
-    super(new RelayTransform());
+    super(new StreamTransform());
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
similarity index 89%
rename from common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
rename to common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
index cd713d3..97a7013 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
@@ -28,14 +28,14 @@ import org.slf4j.LoggerFactory;
  * This transform can be used for merging input data into the {@link OutputCollector}.
  * @param <T> input/output type.
  */
-public final class RelayTransform<T> implements Transform<T, T> {
+public final class StreamTransform<T> implements Transform<T, T> {
   private OutputCollector<T> outputCollector;
-  private static final Logger LOG = LoggerFactory.getLogger(RelayTransform.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(StreamTransform.class.getName());
 
   /**
    * Default constructor.
    */
-  public RelayTransform() {
+  public StreamTransform() {
     // Do nothing.
   }
 
@@ -62,7 +62,7 @@ public final class RelayTransform<T> implements Transform<T, T> {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append(RelayTransform.class);
+    sb.append(StreamTransform.class);
     sb.append(":");
     sb.append(super.toString());
     return sb.toString();
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
index e9504c2..b87f819 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -18,11 +18,13 @@
  */
 package org.apache.nemo.runtime.common.partitioner;
 
+import org.apache.nemo.common.ir.vertex.transform.StreamTransform;
+
 /**
  * An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
  * WARNING: Because this partitioner assigns a dedicated key per element, it should be used under specific circumstances
  * that the number of output element is not that many. For example, every output element of
- * {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform} inserted by large shuffle optimization is always
+ * {@link StreamTransform} inserted by large shuffle optimization is always
  * a partition. In this case, assigning a key for each element can be useful.
  */
 @DedicatedKeyPerElement
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 919143b..838271c 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -203,7 +203,7 @@ public final class TaskExecutorTest {
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
     final List<Watermark> emittedWatermarks = new LinkedList<>();
 
-    final Transform transform = new RelayTransformNoWatermarkEmit(emittedWatermarks);
+    final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks);
     final OperatorVertex operatorVertex = new OperatorVertex(transform);
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
@@ -240,7 +240,7 @@ public final class TaskExecutorTest {
    */
   @Test(timeout=5000)
   public void testParentTaskDataFetching() throws Exception {
-    final IRVertex vertex = new OperatorVertex(new RelayTransform());
+    final IRVertex vertex = new OperatorVertex(new StreamTransform());
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
         .addVertex(vertex)
@@ -290,9 +290,9 @@ public final class TaskExecutorTest {
   @Test()
   public void testMultipleIncomingEdges() throws Exception {
     final List<Watermark> emittedWatermarks = new ArrayList<>();
-    final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
-    final IRVertex operatorIRVertex2 = new OperatorVertex(new RelayTransformNoWatermarkEmit(emittedWatermarks));
-    final IRVertex operatorIRVertex3 = new OperatorVertex(new RelayTransform());
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks));
+    final IRVertex operatorIRVertex3 = new OperatorVertex(new StreamTransform());
 
     final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
     final IRVertex sourceIRVertex2 = new TestUnboundedSourceVertex();
@@ -376,8 +376,8 @@ public final class TaskExecutorTest {
    */
   @Test(timeout=5000)
   public void testTwoOperators() throws Exception {
-    final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
-    final IRVertex operatorIRVertex2 = new OperatorVertex(new RelayTransform());
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransform());
 
     final String edgeId = "edge";
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
@@ -409,7 +409,7 @@ public final class TaskExecutorTest {
     final Transform singleListTransform = new CreateSingleListTransform();
 
     final long broadcastId = 0;
-    final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
     final IRVertex operatorIRVertex2 = new OperatorVertex(new BroadcastVariablePairingTransform(broadcastId));
 
     final String edgeId = "edge";
@@ -460,9 +460,9 @@ public final class TaskExecutorTest {
 
     final IRVertex routerVertex = new OperatorVertex(
       new RoutingTransform(Arrays.asList(additionalTag1, additionalTag2)));
-    final IRVertex mainVertex= new OperatorVertex(new RelayTransform());
-    final IRVertex bonusVertex1 = new OperatorVertex(new RelayTransform());
-    final IRVertex bonusVertex2 = new OperatorVertex(new RelayTransform());
+    final IRVertex mainVertex= new OperatorVertex(new StreamTransform());
+    final IRVertex bonusVertex1 = new OperatorVertex(new StreamTransform());
+    final IRVertex bonusVertex2 = new OperatorVertex(new StreamTransform());
 
     final RuntimeEdge<IRVertex> edge1 = createEdge(routerVertex, mainVertex, "edge-1");
     final RuntimeEdge<IRVertex> edge2 = createEdge(routerVertex, bonusVertex1, "edge-2");
@@ -523,7 +523,7 @@ public final class TaskExecutorTest {
     return new StageEdge("SEdge" + RUNTIME_EDGE_ID.getAndIncrement(),
         ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne),
         irVertex,
-        new OperatorVertex(new RelayTransform()),
+        new OperatorVertex(new StreamTransform()),
         mock(Stage.class),
         mock(Stage.class));
   }
@@ -533,7 +533,7 @@ public final class TaskExecutorTest {
       ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne);
     return new StageEdge("runtime outgoing edge id",
       executionPropertyMap,
-      new OperatorVertex(new RelayTransform()),
+      new OperatorVertex(new StreamTransform()),
       irVertex,
       mock(Stage.class),
       mock(Stage.class));
@@ -591,11 +591,11 @@ public final class TaskExecutorTest {
    * because OutputWriter currently does not support watermarks (TODO #245)
    * @param <T> type
    */
-  private class RelayTransformNoWatermarkEmit<T> implements Transform<T, T> {
+  private class StreamTransformNoWatermarkEmit<T> implements Transform<T, T> {
     private OutputCollector<T> outputCollector;
     private final List<Watermark> emittedWatermarks;
 
-    RelayTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
+    StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
       this.emittedWatermarks = emittedWatermarks;
     }
 
@@ -714,7 +714,7 @@ public final class TaskExecutorTest {
    * Simple identity function for testing.
    * @param <T> input/output type.
    */
-  private class RelayTransform<T> implements Transform<T, T> {
+  private class StreamTransform<T> implements Transform<T, T> {
     private OutputCollector<T> outputCollector;
 
     @Override