You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/06/15 10:42:09 UTC
[incubator-nemo] branch master updated: [NEMO-83] Move
/tests/runtime into /runtime/tests (#47)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new a266d40 [NEMO-83] Move /tests/runtime into /runtime/tests (#47)
a266d40 is described below
commit a266d4056f076164833da58f57c534f2bf3deec7
Author: Yunseong Lee <yu...@me.com>
AuthorDate: Fri Jun 15 19:42:05 2018 +0900
[NEMO-83] Move /tests/runtime into /runtime/tests (#47)
JIRA: [NEMO-83: Move /tests/runtime into /runtime/tests](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-83)
**Major changes:**
- This PR moves the tests/runtime to nemo-runtime-[common|executor]
**Minor changes to note:**
- New Coders (PairCoder, IntCoder) in tests have been added to remove the dependency on Beam
- `e.s.n.tests.runtime.common.plan.DAGConverterTest` stays in place due to the complex dependency; as per discussion with @johnyangk, we decided to revisit this test in another issue.
**Tests for the changes:**
- N/A
**Other comments:**
- N/A
resolves [NEMO-83](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-83)
---
.../java/edu/snu/nemo/common/coder/IntCoder.java | 52 +++++++++++++++++
.../java/edu/snu/nemo/common}/coder/PairCoder.java | 66 ++--------------------
.../snu/nemo/runtime/common}/RuntimeTestUtil.java | 13 ++++-
.../common/message/local/LocalMessageTest.java | 2 +-
.../pass/runtime/DataSkewRuntimePassTest.java | 4 +-
runtime/executor/pom.xml | 12 ++++
.../runtime/executor/data/BlockStoreTest.java | 16 +++---
.../data/BlockTransferConnectionQueueTest.java | 2 +-
.../executor/datatransfer/DataTransferTest.java | 13 ++---
9 files changed, 95 insertions(+), 85 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
new file mode 100644
index 0000000..face994
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.*;
+
+/**
+ * A {@link Coder} which is used for an integer.
+ */
+public final class IntCoder implements Coder<Integer> {
+
+ /**
+ * A private constructor.
+ */
+ private IntCoder() {
+ }
+
+ /**
+ * Static initializer of the coder.
+ */
+ public static IntCoder of() {
+ return new IntCoder();
+ }
+
+ @Override
+ public void encode(final Integer value, final OutputStream outStream) throws IOException {
+ final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
+ dataOutputStream.writeInt(value);
+ }
+
+ @Override
+ public Integer decode(final InputStream inStream) throws IOException {
+ // If the inStream is closed well in upper level, it is okay to not close this stream
+ // because the DataInputStream itself will not contain any extra information.
+ // (when we close this stream, the inStream will be closed together.)
+ final DataInputStream dataInputStream = new DataInputStream(inStream);
+ return dataInputStream.readInt();
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
similarity index 50%
rename from compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java
rename to common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
index 0fe6e88..3ad3552 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
@@ -13,26 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.compiler.frontend.beam.coder;
+package edu.snu.nemo.common.coder;
import edu.snu.nemo.common.Pair;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
/**
- * BEAM Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM.
+ * A Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM.
* @param <A> type for the left coder.
* @param <B> type for the right coder.
*/
-public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
+public final class PairCoder<A, B> implements Coder<Pair<A, B>> {
private final Coder<A> leftCoder;
private final Coder<B> rightCoder;
@@ -64,6 +57,7 @@ public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
Coder<A> getLeftCoder() {
return leftCoder;
}
+
/**
* @return the right coder.
*/
@@ -71,12 +65,10 @@ public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
return rightCoder;
}
- //=====================================================================================================
-
@Override
public void encode(final Pair<A, B> pair, final OutputStream outStream) throws IOException {
if (pair == null) {
- throw new CoderException("cannot encode a null KV");
+ throw new IOException("cannot encode a null pair");
}
leftCoder.encode(pair.left(), outStream);
rightCoder.encode(pair.right(), outStream);
@@ -88,52 +80,4 @@ public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
final B value = rightCoder.decode(inStream);
return Pair.of(key, value);
}
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(leftCoder, rightCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(this, "Key coder must be deterministic", getLeftCoder());
- verifyDeterministic(this, "Value coder must be deterministic", getRightCoder());
- }
-
- @Override
- public boolean consistentWithEquals() {
- return leftCoder.consistentWithEquals() && rightCoder.consistentWithEquals();
- }
-
- @Override
- public Object structuralValue(final Pair<A, B> pair) {
- if (consistentWithEquals()) {
- return pair;
- } else {
- return Pair.of(getLeftCoder().structuralValue(pair.left()), getRightCoder().structuralValue(pair.right()));
- }
- }
-
- /**
- * Returns whether both leftCoder and rightCoder are considered not expensive.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(final Pair<A, B> pair) {
- return leftCoder.isRegisterByteSizeObserverCheap(pair.left())
- && rightCoder.isRegisterByteSizeObserverCheap(pair.right());
- }
-
- /**
- * Notifies ElementByteSizeObserver about the byte size of the
- * encoded value using this coder.
- */
- @Override
- public void registerByteSizeObserver(final Pair<A, B> pair,
- final ElementByteSizeObserver observer) throws Exception {
- if (pair == null) {
- throw new CoderException("cannot encode a null Pair");
- }
- leftCoder.registerByteSizeObserver(pair.left(), observer);
- rightCoder.registerByteSizeObserver(pair.right(), observer);
- }
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java
similarity index 85%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java
index 26b4a54..0a75b6f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java
@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.tests.runtime;
+package edu.snu.nemo.runtime.common;
-import org.apache.beam.sdk.values.KV;
+import edu.snu.nemo.common.Pair;
import java.util.*;
import java.util.stream.Collectors;
@@ -25,6 +25,13 @@ import java.util.stream.IntStream;
* Utility class for runtime unit tests.
*/
public final class RuntimeTestUtil {
+
+ /**
+ * Private constructor for utility class.
+ */
+ private RuntimeTestUtil() {
+ }
+
/**
* Gets a list of integer pair elements in range.
* @param start value of the range (inclusive).
@@ -34,7 +41,7 @@ public final class RuntimeTestUtil {
public static List getRangedNumList(final int start,
final int end) {
final List numList = new ArrayList<>(end - start);
- IntStream.range(start, end).forEach(number -> numList.add(KV.of(number, number)));
+ IntStream.range(start, end).forEach(number -> numList.add(Pair.of(number, number)));
return numList;
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
similarity index 99%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java
rename to runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
index 24d86d9..a05b7d2 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.tests.runtime.common.message.local;
+package edu.snu.nemo.runtime.common.message.local;
import edu.snu.nemo.runtime.common.message.MessageContext;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
similarity index 93%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
rename to runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index d7afe45..cd2fc16 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -13,11 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.tests.runtime.common.optimizer.pass.runtime;
+package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
-import edu.snu.nemo.common.Pair;
import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
import org.junit.Before;
import org.junit.Test;
diff --git a/runtime/executor/pom.xml b/runtime/executor/pom.xml
index 578b080..0997f8f 100644
--- a/runtime/executor/pom.xml
+++ b/runtime/executor/pom.xml
@@ -57,5 +57,17 @@ limitations under the License.
<artifactId>lz4-java</artifactId>
<version>1.4.1</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.snu.nemo</groupId>
+ <artifactId>nemo-runtime-master</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
similarity index 98%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index d864819..6bac935 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -13,11 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.tests.runtime.executor.data;
+package edu.snu.nemo.runtime.executor.data;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.coder.IntCoder;
+import edu.snu.nemo.common.coder.PairCoder;
import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.data.HashRange;
@@ -26,7 +28,6 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.state.BlockState;
-import edu.snu.nemo.runtime.executor.data.*;
import edu.snu.nemo.runtime.executor.data.block.Block;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
@@ -34,9 +35,6 @@ import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
import edu.snu.nemo.runtime.executor.data.stores.*;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.RuntimeMaster;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.values.KV;
import org.apache.commons.io.FileUtils;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
@@ -59,7 +57,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.getRangedNumList;
+import static edu.snu.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -73,7 +71,7 @@ import static org.mockito.Mockito.when;
@PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class})
public final class BlockStoreTest {
private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
- private static final Coder CODER = new BeamCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+ private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
private static final Serializer SERIALIZER = new Serializer(CODER,
Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Compression.LZ4)));
private static final SerializerManager serializerManager = mock(SerializerManager.class);
@@ -575,7 +573,7 @@ public final class BlockStoreTest {
final int start,
final int end) {
final List numList = new ArrayList<>(end - start);
- IntStream.range(start, end).forEach(number -> numList.add(KV.of(key, number)));
+ IntStream.range(start, end).forEach(number -> numList.add(Pair.of(key, number)));
return numList;
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
similarity index 98%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
index bc1ba30..eeb0031 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.tests.runtime.executor.data;
+package edu.snu.nemo.runtime.executor.data;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
similarity index 98%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 6a9dbfb..a0252ca 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.tests.runtime.executor.datatransfer;
+package edu.snu.nemo.runtime.executor.datatransfer;
+import edu.snu.nemo.common.coder.IntCoder;
+import edu.snu.nemo.common.coder.PairCoder;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.*;
@@ -28,7 +30,6 @@ import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -53,8 +54,6 @@ import edu.snu.nemo.runtime.master.RuntimeMaster;
import edu.snu.nemo.runtime.master.resource.ContainerManager;
import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
import edu.snu.nemo.runtime.master.scheduler.*;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.commons.io.FileUtils;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.io.network.naming.NameResolverConfiguration;
@@ -81,8 +80,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
-import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.flatten;
-import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.getRangedNumList;
+import static edu.snu.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
+import static edu.snu.nemo.runtime.common.RuntimeTestUtil.flatten;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -107,7 +106,7 @@ public final class DataTransferTest {
private static final int PARALLELISM_TEN = 10;
private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
- private static final Coder CODER = new BeamCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+ private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
private static final Tang TANG = Tang.Factory.getTang();
private static final int HASH_RANGE_MULTIPLIER = 10;
--
To stop receiving notification emails like this one, please contact
johnyangk@apache.org.