You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 08:50:39 UTC
[incubator-nemo] branch reshaping updated: relay to stream
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/reshaping by this push:
new cd38af6 relay to stream
cd38af6 is described below
commit cd38af68729357098259d0984e54b96b0e76f469
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 17:50:29 2019 +0900
relay to stream
---
.../nemo/common/ir/vertex/system/StreamVertex.java | 4 +--
.../{RelayTransform.java => StreamTransform.java} | 8 +++---
.../DedicatedKeyPerElementPartitioner.java | 4 ++-
.../runtime/executor/task/TaskExecutorTest.java | 32 +++++++++++-----------
4 files changed, 25 insertions(+), 23 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java
index d9884bf..4f4a9ca 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/system/StreamVertex.java
@@ -19,7 +19,7 @@
package org.apache.nemo.common.ir.vertex.system;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.transform.RelayTransform;
+import org.apache.nemo.common.ir.vertex.transform.StreamTransform;
/**
* IRVertex that transforms input data.
@@ -30,6 +30,6 @@ public final class StreamVertex extends OperatorVertex {
* Constructor.
*/
public StreamVertex() {
- super(new RelayTransform());
+ super(new StreamTransform());
}
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
similarity index 89%
rename from common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
rename to common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
index cd713d3..97a7013 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
@@ -28,14 +28,14 @@ import org.slf4j.LoggerFactory;
* This transform can be used for merging input data into the {@link OutputCollector}.
* @param <T> input/output type.
*/
-public final class RelayTransform<T> implements Transform<T, T> {
+public final class StreamTransform<T> implements Transform<T, T> {
private OutputCollector<T> outputCollector;
- private static final Logger LOG = LoggerFactory.getLogger(RelayTransform.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(StreamTransform.class.getName());
/**
* Default constructor.
*/
- public RelayTransform() {
+ public StreamTransform() {
// Do nothing.
}
@@ -62,7 +62,7 @@ public final class RelayTransform<T> implements Transform<T, T> {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append(RelayTransform.class);
+ sb.append(StreamTransform.class);
sb.append(":");
sb.append(super.toString());
return sb.toString();
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
index e9504c2..b87f819 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -18,11 +18,13 @@
*/
package org.apache.nemo.runtime.common.partitioner;
+import org.apache.nemo.common.ir.vertex.transform.StreamTransform;
+
/**
* An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
* WARNING: Because this partitioner assigns a dedicated key per element, it should be used under specific circumstances
* that the number of output element is not that many. For example, every output element of
- * {@link org.apache.nemo.common.ir.vertex.transform.RelayTransform} inserted by large shuffle optimization is always
+ * {@link StreamTransform} inserted by large shuffle optimization is always
* a partition. In this case, assigning a key for each element can be useful.
*/
@DedicatedKeyPerElement
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 919143b..838271c 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -203,7 +203,7 @@ public final class TaskExecutorTest {
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
final List<Watermark> emittedWatermarks = new LinkedList<>();
- final Transform transform = new RelayTransformNoWatermarkEmit(emittedWatermarks);
+ final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks);
final OperatorVertex operatorVertex = new OperatorVertex(transform);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
@@ -240,7 +240,7 @@ public final class TaskExecutorTest {
*/
@Test(timeout=5000)
public void testParentTaskDataFetching() throws Exception {
- final IRVertex vertex = new OperatorVertex(new RelayTransform());
+ final IRVertex vertex = new OperatorVertex(new StreamTransform());
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(vertex)
@@ -290,9 +290,9 @@ public final class TaskExecutorTest {
@Test()
public void testMultipleIncomingEdges() throws Exception {
final List<Watermark> emittedWatermarks = new ArrayList<>();
- final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
- final IRVertex operatorIRVertex2 = new OperatorVertex(new RelayTransformNoWatermarkEmit(emittedWatermarks));
- final IRVertex operatorIRVertex3 = new OperatorVertex(new RelayTransform());
+ final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
+ final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks));
+ final IRVertex operatorIRVertex3 = new OperatorVertex(new StreamTransform());
final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
final IRVertex sourceIRVertex2 = new TestUnboundedSourceVertex();
@@ -376,8 +376,8 @@ public final class TaskExecutorTest {
*/
@Test(timeout=5000)
public void testTwoOperators() throws Exception {
- final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
- final IRVertex operatorIRVertex2 = new OperatorVertex(new RelayTransform());
+ final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
+ final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransform());
final String edgeId = "edge";
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
@@ -409,7 +409,7 @@ public final class TaskExecutorTest {
final Transform singleListTransform = new CreateSingleListTransform();
final long broadcastId = 0;
- final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
+ final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
final IRVertex operatorIRVertex2 = new OperatorVertex(new BroadcastVariablePairingTransform(broadcastId));
final String edgeId = "edge";
@@ -460,9 +460,9 @@ public final class TaskExecutorTest {
final IRVertex routerVertex = new OperatorVertex(
new RoutingTransform(Arrays.asList(additionalTag1, additionalTag2)));
- final IRVertex mainVertex= new OperatorVertex(new RelayTransform());
- final IRVertex bonusVertex1 = new OperatorVertex(new RelayTransform());
- final IRVertex bonusVertex2 = new OperatorVertex(new RelayTransform());
+ final IRVertex mainVertex= new OperatorVertex(new StreamTransform());
+ final IRVertex bonusVertex1 = new OperatorVertex(new StreamTransform());
+ final IRVertex bonusVertex2 = new OperatorVertex(new StreamTransform());
final RuntimeEdge<IRVertex> edge1 = createEdge(routerVertex, mainVertex, "edge-1");
final RuntimeEdge<IRVertex> edge2 = createEdge(routerVertex, bonusVertex1, "edge-2");
@@ -523,7 +523,7 @@ public final class TaskExecutorTest {
return new StageEdge("SEdge" + RUNTIME_EDGE_ID.getAndIncrement(),
ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne),
irVertex,
- new OperatorVertex(new RelayTransform()),
+ new OperatorVertex(new StreamTransform()),
mock(Stage.class),
mock(Stage.class));
}
@@ -533,7 +533,7 @@ public final class TaskExecutorTest {
ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne);
return new StageEdge("runtime outgoing edge id",
executionPropertyMap,
- new OperatorVertex(new RelayTransform()),
+ new OperatorVertex(new StreamTransform()),
irVertex,
mock(Stage.class),
mock(Stage.class));
@@ -591,11 +591,11 @@ public final class TaskExecutorTest {
* because OutputWriter currently does not support watermarks (TODO #245)
* @param <T> type
*/
- private class RelayTransformNoWatermarkEmit<T> implements Transform<T, T> {
+ private class StreamTransformNoWatermarkEmit<T> implements Transform<T, T> {
private OutputCollector<T> outputCollector;
private final List<Watermark> emittedWatermarks;
- RelayTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
+ StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
this.emittedWatermarks = emittedWatermarks;
}
@@ -714,7 +714,7 @@ public final class TaskExecutorTest {
* Simple identity function for testing.
* @param <T> input/output type.
*/
- private class RelayTransform<T> implements Transform<T, T> {
+ private class StreamTransform<T> implements Transform<T, T> {
private OutputCollector<T> outputCollector;
@Override