You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/06/15 10:42:09 UTC

[incubator-nemo] branch master updated: [NEMO-83] Move /tests/runtime into /runtime/tests (#47)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new a266d40  [NEMO-83] Move /tests/runtime into /runtime/tests (#47)
a266d40 is described below

commit a266d4056f076164833da58f57c534f2bf3deec7
Author: Yunseong Lee <yu...@me.com>
AuthorDate: Fri Jun 15 19:42:05 2018 +0900

    [NEMO-83] Move /tests/runtime into /runtime/tests (#47)
    
    JIRA: [NEMO-83: Move /tests/runtime into /runtime/tests](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-83)
    
    **Major changes:**
    - This PR moves the tests/runtime to nemo-runtime-[common|executor]
    
    **Minor changes to note:**
    - New Coders (PairCoder, IntCoder) in tests have been added to remove the dependency on Beam
    - `e.s.n.tests.runtime.common.plan.DAGConverterTest` stays in place due to the complex dependency; as per discussion with @johnyangk, we decided to revisit this test in another issue.
    
    **Tests for the changes:**
    - N/A
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-83](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-83)
---
 .../java/edu/snu/nemo/common/coder/IntCoder.java   | 52 +++++++++++++++++
 .../java/edu/snu/nemo/common}/coder/PairCoder.java | 66 ++--------------------
 .../snu/nemo/runtime/common}/RuntimeTestUtil.java  | 13 ++++-
 .../common/message/local/LocalMessageTest.java     |  2 +-
 .../pass/runtime/DataSkewRuntimePassTest.java      |  4 +-
 runtime/executor/pom.xml                           | 12 ++++
 .../runtime/executor/data/BlockStoreTest.java      | 16 +++---
 .../data/BlockTransferConnectionQueueTest.java     |  2 +-
 .../executor/datatransfer/DataTransferTest.java    | 13 ++---
 9 files changed, 95 insertions(+), 85 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
new file mode 100644
index 0000000..face994
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/IntCoder.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.*;
+
+/**
+ * A {@link Coder} which is used for an integer.
+ */
+public final class IntCoder implements Coder<Integer> {
+
+  /**
+   * A private constructor.
+   */
+  private IntCoder() {
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static IntCoder of() {
+    return new IntCoder();
+  }
+
+  @Override
+  public void encode(final Integer value, final OutputStream outStream) throws IOException {
+    final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
+    dataOutputStream.writeInt(value);
+  }
+
+  @Override
+  public Integer decode(final InputStream inStream) throws IOException {
+    // If the inStream is closed well in upper level, it is okay to not close this stream
+    // because the DataInputStream itself will not contain any extra information.
+    // (when we close this stream, the inStream will be closed together.)
+    final DataInputStream dataInputStream = new DataInputStream(inStream);
+    return dataInputStream.readInt();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
similarity index 50%
rename from compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java
rename to common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
index 0fe6e88..3ad3552 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/PairCoder.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/PairCoder.java
@@ -13,26 +13,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.compiler.frontend.beam.coder;
+package edu.snu.nemo.common.coder;
 
 import edu.snu.nemo.common.Pair;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
 
 /**
- * BEAM Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM.
+ * A Coder for {@link edu.snu.nemo.common.Pair}. Reference: KvCoder in BEAM.
  * @param <A> type for the left coder.
  * @param <B> type for the right coder.
  */
-public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
+public final class PairCoder<A, B> implements Coder<Pair<A, B>> {
   private final Coder<A> leftCoder;
   private final Coder<B> rightCoder;
 
@@ -64,6 +57,7 @@ public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
   Coder<A> getLeftCoder() {
     return leftCoder;
   }
+
   /**
    * @return the right coder.
    */
@@ -71,12 +65,10 @@ public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
     return rightCoder;
   }
 
-  //=====================================================================================================
-
   @Override
   public void encode(final Pair<A, B> pair, final OutputStream outStream) throws IOException {
     if (pair == null) {
-      throw new CoderException("cannot encode a null KV");
+      throw new IOException("cannot encode a null pair");
     }
     leftCoder.encode(pair.left(), outStream);
     rightCoder.encode(pair.right(), outStream);
@@ -88,52 +80,4 @@ public final class PairCoder<A, B> extends StructuredCoder<Pair<A, B>> {
     final B value = rightCoder.decode(inStream);
     return Pair.of(key, value);
   }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(leftCoder, rightCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(this, "Key coder must be deterministic", getLeftCoder());
-    verifyDeterministic(this, "Value coder must be deterministic", getRightCoder());
-  }
-
-  @Override
-  public boolean consistentWithEquals() {
-    return leftCoder.consistentWithEquals() && rightCoder.consistentWithEquals();
-  }
-
-  @Override
-  public Object structuralValue(final Pair<A, B> pair) {
-    if (consistentWithEquals()) {
-      return pair;
-    } else {
-      return Pair.of(getLeftCoder().structuralValue(pair.left()), getRightCoder().structuralValue(pair.right()));
-    }
-  }
-
-  /**
-   * Returns whether both leftCoder and rightCoder are considered not expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(final Pair<A, B> pair) {
-    return leftCoder.isRegisterByteSizeObserverCheap(pair.left())
-        && rightCoder.isRegisterByteSizeObserverCheap(pair.right());
-  }
-
-  /**
-   * Notifies ElementByteSizeObserver about the byte size of the
-   * encoded value using this coder.
-   */
-  @Override
-  public void registerByteSizeObserver(final Pair<A, B> pair,
-                                       final ElementByteSizeObserver observer) throws Exception {
-    if (pair == null) {
-      throw new CoderException("cannot encode a null Pair");
-    }
-    leftCoder.registerByteSizeObserver(pair.left(), observer);
-    rightCoder.registerByteSizeObserver(pair.right(), observer);
-  }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java
similarity index 85%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java
index 26b4a54..0a75b6f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeTestUtil.java
@@ -13,9 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime;
+package edu.snu.nemo.runtime.common;
 
-import org.apache.beam.sdk.values.KV;
+import edu.snu.nemo.common.Pair;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -25,6 +25,13 @@ import java.util.stream.IntStream;
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
+
+  /**
+   * Private constructor for utility class.
+   */
+  private RuntimeTestUtil() {
+  }
+
   /**
    * Gets a list of integer pair elements in range.
    * @param start value of the range (inclusive).
@@ -34,7 +41,7 @@ public final class RuntimeTestUtil {
   public static List getRangedNumList(final int start,
                                                final int end) {
     final List numList = new ArrayList<>(end - start);
-    IntStream.range(start, end).forEach(number -> numList.add(KV.of(number, number)));
+    IntStream.range(start, end).forEach(number -> numList.add(Pair.of(number, number)));
     return numList;
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
similarity index 99%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java
rename to runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
index 24d86d9..a05b7d2 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/message/local/LocalMessageTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/message/local/LocalMessageTest.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.common.message.local;
+package edu.snu.nemo.runtime.common.message.local;
 
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
similarity index 93%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
rename to runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index d7afe45..cd2fc16 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -13,11 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.common.optimizer.pass.runtime;
+package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/runtime/executor/pom.xml b/runtime/executor/pom.xml
index 578b080..0997f8f 100644
--- a/runtime/executor/pom.xml
+++ b/runtime/executor/pom.xml
@@ -57,5 +57,17 @@ limitations under the License.
             <artifactId>lz4-java</artifactId>
             <version>1.4.1</version>
         </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-runtime-master</artifactId>
+            <version>0.1-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
similarity index 98%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index d864819..6bac935 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -13,11 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.executor.data;
+package edu.snu.nemo.runtime.executor.data;
 
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.coder.IntCoder;
+import edu.snu.nemo.common.coder.PairCoder;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.HashRange;
@@ -26,7 +28,6 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.state.BlockState;
-import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
@@ -34,9 +35,6 @@ import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.stores.*;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.values.KV;
 import org.apache.commons.io.FileUtils;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -59,7 +57,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.getRangedNumList;
+import static edu.snu.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -73,7 +71,7 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class})
 public final class BlockStoreTest {
   private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
-  private static final Coder CODER = new BeamCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+  private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
   private static final Serializer SERIALIZER = new Serializer(CODER,
       Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Compression.LZ4)));
   private static final SerializerManager serializerManager = mock(SerializerManager.class);
@@ -575,7 +573,7 @@ public final class BlockStoreTest {
                                         final int start,
                                         final int end) {
     final List numList = new ArrayList<>(end - start);
-    IntStream.range(start, end).forEach(number -> numList.add(KV.of(key, number)));
+    IntStream.range(start, end).forEach(number -> numList.add(Pair.of(key, number)));
     return numList;
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
similarity index 98%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
index bc1ba30..eeb0031 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.executor.data;
+package edu.snu.nemo.runtime.executor.data;
 
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
similarity index 98%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 6a9dbfb..a0252ca 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -13,8 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.executor.datatransfer;
+package edu.snu.nemo.runtime.executor.datatransfer;
 
+import edu.snu.nemo.common.coder.IntCoder;
+import edu.snu.nemo.common.coder.PairCoder;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
@@ -28,7 +30,6 @@ import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -53,8 +54,6 @@ import edu.snu.nemo.runtime.master.RuntimeMaster;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.scheduler.*;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.commons.io.FileUtils;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.io.network.naming.NameResolverConfiguration;
@@ -81,8 +80,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
-import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.flatten;
-import static edu.snu.nemo.tests.runtime.RuntimeTestUtil.getRangedNumList;
+import static edu.snu.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
+import static edu.snu.nemo.runtime.common.RuntimeTestUtil.flatten;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -107,7 +106,7 @@ public final class DataTransferTest {
   private static final int PARALLELISM_TEN = 10;
   private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
   private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
-  private static final Coder CODER = new BeamCoder(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+  private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
   private static final Tang TANG = Tang.Factory.getTang();
   private static final int HASH_RANGE_MULTIPLIER = 10;
 

-- 
To stop receiving notification emails like this one, please contact
johnyangk@apache.org.