You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/18 04:23:25 UTC

[GitHub] sanha closed pull request #33: [NEMO-111] Remove ExecutionProperty.Key

sanha closed pull request #33: [NEMO-111] Remove ExecutionProperty.Key
URL: https://github.com/apache/incubator-nemo/pull/33
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index 2d90388d..ed051f70 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -21,9 +21,10 @@
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.io.Serializable;
 import java.util.*;
@@ -244,16 +245,16 @@ private void executionPropertyCheck() {
     // SideInput is not compatible with Push
     vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
         .filter(e -> Boolean.TRUE.equals(e.isSideInput()))
-        .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getProperty(ExecutionProperty.Key.DataFlowModel)))
+        .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getPropertyValue(DataFlowModelProperty.class).get()))
         .forEach(e -> {
           throw new RuntimeException("DAG execution property check: "
               + "SideInput edge is not compatible with push" + e.getId());
         }));
     // DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection)
     vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
-        .filter(e -> MetricCollectionProperty.Value.DataSkewRuntimePass
-                      .equals(e.getProperty(ExecutionProperty.Key.MetricCollection)))
-        .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getProperty(ExecutionProperty.Key.DataFlowModel)))
+        .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
+                      .equals(e.getPropertyValue(MetricCollectionProperty.class)))
+        .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getPropertyValue(DataFlowModelProperty.class).get()))
         .forEach(e -> {
           throw new RuntimeException("DAG execution property check: "
               + "DataSizeMetricCollection edge is not compatible with push" + e.getId());
@@ -263,13 +264,14 @@ private void executionPropertyCheck() {
     vertices.stream().filter(v -> v instanceof IRVertex)
         .map(v -> (IRVertex) v)
         .forEach(v -> {
-          final Integer stageId = v.getProperty(ExecutionProperty.Key.StageId);
-          if (stageId != null) {
-            if (!stageIdToParallelismMap.containsKey(stageId)) {
-              stageIdToParallelismMap.put(stageId, v.getProperty(ExecutionProperty.Key.Parallelism));
-            } else if (!stageIdToParallelismMap.get(stageId).equals(v.getProperty(ExecutionProperty.Key.Parallelism))) {
+          final Optional<Integer> stageId = v.getPropertyValue(StageIdProperty.class);
+          if (stageId.isPresent()) {
+            if (!stageIdToParallelismMap.containsKey(stageId.get())) {
+              stageIdToParallelismMap.put(stageId.get(), v.getPropertyValue(ParallelismProperty.class).get());
+            } else if (!stageIdToParallelismMap.get(stageId.get())
+                .equals(v.getPropertyValue(ParallelismProperty.class).get())) {
               throw new RuntimeException("DAG execution property check: vertices are in a same stage, "
-                  + "but has different parallelism execution properties: Stage" + stageId + ": " + v.getId());
+                  + "but has different parallelism execution properties: Stage" + stageId.get() + ": " + v.getId());
             }
           }
         });
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 0778b634..b8018418 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -18,17 +18,21 @@
 import edu.snu.nemo.common.dag.Edge;
 import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.function.Consumer;
+
 /**
  * Physical execution plan of intermediate data movement.
  */
 public final class IREdge extends Edge<IRVertex> {
-  private final ExecutionPropertyMap executionProperties;
+  private final ExecutionPropertyMap<EdgeExecutionProperty> executionProperties;
   private final Boolean isSideInput;
 
   /**
@@ -68,7 +72,7 @@ public IREdge(final DataCommunicationPatternProperty.Value commPattern,
    * @param executionProperty the execution property.
    * @return the IREdge with the execution property set.
    */
-  public IREdge setProperty(final ExecutionProperty<?> executionProperty) {
+  public IREdge setProperty(final EdgeExecutionProperty<?> executionProperty) {
     executionProperties.put(executionProperty);
     return this;
   }
@@ -80,7 +84,8 @@ public IREdge setProperty(final ExecutionProperty<?> executionProperty) {
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
-  public <T> T getProperty(final ExecutionProperty.Key executionPropertyKey) {
+  public <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends EdgeExecutionProperty<T>> executionPropertyKey) {
     return executionProperties.get(executionPropertyKey);
   }
 
@@ -112,7 +117,7 @@ public Boolean hasSameItineraryAs(final IREdge edge) {
    * @param thatEdge the edge to copy executionProperties to.
    */
   public void copyExecutionPropertiesTo(final IREdge thatEdge) {
-    this.getExecutionProperties().forEachProperties(thatEdge::setProperty);
+    this.getExecutionProperties().forEachProperties((Consumer<EdgeExecutionProperty>) thatEdge::setProperty);
   }
 
   @Override
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
index 1d2e5dc6..aae1c6e7 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CoderProperty.java
@@ -16,19 +16,19 @@
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
 import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * Coder ExecutionProperty.
  */
-public final class CoderProperty extends ExecutionProperty<Coder> {
+public final class CoderProperty extends EdgeExecutionProperty<Coder> {
   /**
    * Constructor.
    *
    * @param value value of the execution property.
    */
   private CoderProperty(final Coder value) {
-    super(Key.Coder, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
index f5bfbbaf..64023198 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/CompressionProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * Compression ExecutionProperty.
  */
-public final class CompressionProperty extends ExecutionProperty<CompressionProperty.Compression> {
+public final class CompressionProperty extends EdgeExecutionProperty<CompressionProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private CompressionProperty(final Compression value) {
-    super(Key.Compression, value);
+  private CompressionProperty(final Value value) {
+    super(value);
   }
 
   /**
@@ -34,14 +34,14 @@ private CompressionProperty(final Compression value) {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static CompressionProperty of(final Compression value) {
+  public static CompressionProperty of(final Value value) {
     return new CompressionProperty(value);
   }
 
   /**
    * Possible values of Compression ExecutionProperty.
    */
-  public enum Compression {
+  public enum Value {
     Gzip,
     LZ4,
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataCommunicationPatternProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataCommunicationPatternProperty.java
index 39d82c4d..cab9b772 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataCommunicationPatternProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataCommunicationPatternProperty.java
@@ -15,19 +15,20 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
+// TODO #492: modularizing runtime components for data communication pattern.
 /**
  * DataCommunicationPattern ExecutionProperty.
  */
 public final class DataCommunicationPatternProperty
-    extends ExecutionProperty<DataCommunicationPatternProperty.Value>  {
+    extends EdgeExecutionProperty<DataCommunicationPatternProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private DataCommunicationPatternProperty(final Value value) {
-    super(Key.DataCommunicationPattern, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataFlowModelProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataFlowModelProperty.java
index 73253840..1c1a8bb4 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataFlowModelProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataFlowModelProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * DataFlowModel ExecutionProperty.
  */
-public final class DataFlowModelProperty extends ExecutionProperty<DataFlowModelProperty.Value> {
+public final class DataFlowModelProperty extends EdgeExecutionProperty<DataFlowModelProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private DataFlowModelProperty(final Value value) {
-    super(Key.DataFlowModel, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
index d07c9344..6e87844f 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * DataStore ExecutionProperty.
  */
-public final class DataStoreProperty extends ExecutionProperty<DataStoreProperty.Value> {
+public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private DataStoreProperty(final Value value) {
-    super(Key.DataStore, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupProperty.java
index 80f07573..8a5f980e 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupProperty.java
@@ -15,19 +15,19 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * Invariant data ExecutionProperty. Use to indicate same data edge when unrolling loop vertex.
  * See DuplicateEdgeGroupPropertyValue
  */
-public final class DuplicateEdgeGroupProperty extends ExecutionProperty<DuplicateEdgeGroupPropertyValue> {
+public final class DuplicateEdgeGroupProperty extends EdgeExecutionProperty<DuplicateEdgeGroupPropertyValue> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private DuplicateEdgeGroupProperty(final DuplicateEdgeGroupPropertyValue value) {
-    super(Key.DuplicateEdgeGroup, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
index 0387e070..7adfc295 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyExtractorProperty.java
@@ -16,19 +16,19 @@
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
 import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * KeyExtractor ExecutionProperty.
  */
-public final class KeyExtractorProperty extends ExecutionProperty<KeyExtractor> {
+public final class KeyExtractorProperty extends EdgeExecutionProperty<KeyExtractor> {
   /**
    * Constructor.
    *
    * @param value value of the execution property.
    */
   private KeyExtractorProperty(final KeyExtractor value) {
-    super(Key.KeyExtractor, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
index 713be995..d3892796 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * MetricCollection ExecutionProperty.
  */
-public final class MetricCollectionProperty extends ExecutionProperty<MetricCollectionProperty.Value> {
+public final class MetricCollectionProperty extends EdgeExecutionProperty<MetricCollectionProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private MetricCollectionProperty(final Value value) {
-    super(Key.MetricCollection, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
index 08e607ab..249433e7 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/PartitionerProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * Partitioner ExecutionProperty.
  */
-public final class PartitionerProperty extends ExecutionProperty<PartitionerProperty.Value> {
+public final class PartitionerProperty extends EdgeExecutionProperty<PartitionerProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private PartitionerProperty(final Value value) {
-    super(Key.Partitioner, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/UsedDataHandlingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/UsedDataHandlingProperty.java
index 5b01623d..33f6f74d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/UsedDataHandlingProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/UsedDataHandlingProperty.java
@@ -15,19 +15,19 @@
  */
 package edu.snu.nemo.common.ir.edge.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * UsedDataHandling ExecutionProperty.
  * This property represents the used data handling strategy.
  */
-public final class UsedDataHandlingProperty extends ExecutionProperty<UsedDataHandlingProperty.Value> {
+public final class UsedDataHandlingProperty extends EdgeExecutionProperty<UsedDataHandlingProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private UsedDataHandlingProperty(final Value value) {
-    super(Key.UsedDataHandling, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/EdgeExecutionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/EdgeExecutionProperty.java
new file mode 100644
index 00000000..906b72a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/EdgeExecutionProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.executionproperty;
+
+import java.io.Serializable;
+
+/**
+ * {@link ExecutionProperty} for {@link edu.snu.nemo.common.ir.edge.IREdge}.
+ * @param <T> Type of the value.
+ */
+public abstract class EdgeExecutionProperty<T extends Serializable> extends ExecutionProperty<T> {
+  /**
+   * Default constructor.
+   * @param value value of the EdgeExecutionProperty.
+   */
+  public EdgeExecutionProperty(final T value) {
+    super(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
index 45249ae4..87efce60 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionProperty.java
@@ -16,23 +16,19 @@
 package edu.snu.nemo.common.ir.executionproperty;
 
 import java.io.Serializable;
-import java.util.Objects;
 
 /**
  * An abstract class for each execution factors.
- * @param <T> Key of the value.
+ * @param <T> Type of the value.
  */
-public abstract class ExecutionProperty<T> implements Serializable {
-  private Key key;
+public abstract class ExecutionProperty<T extends Serializable> implements Serializable {
   private T value;
 
   /**
    * Default constructor.
-   * @param key key of the ExecutionProperty, given by the enum in this class.
    * @param value value of the ExecutionProperty.
    */
-  public ExecutionProperty(final Key key, final T value) {
-    this.key = key;
+  public ExecutionProperty(final T value) {
     this.value = value;
   }
 
@@ -43,23 +39,6 @@ public final T getValue() {
     return this.value;
   }
 
-  /**
-   * @return the key of the execution property.
-   */
-  public final Key getKey() {
-    return key;
-  }
-
-  /**
-   * Static method to get an empty execution property.
-   * @param <T> type of the value of the execution property.
-   * @return an empty execution property.
-   */
-  static <T> ExecutionProperty<T> emptyExecutionProperty() {
-    return new ExecutionProperty<T>(null, null) {
-    };
-  }
-
   @Override
   public final boolean equals(final Object o) {
     if (this == o) {
@@ -69,36 +48,11 @@ public final boolean equals(final Object o) {
       return false;
     }
     final ExecutionProperty<?> that = (ExecutionProperty<?>) o;
-    return getKey() == that.getKey()
-        && Objects.equals(getValue(), that.getValue());
+    return value != null ? value.equals(that.value) : that.value == null;
   }
 
   @Override
   public final int hashCode() {
-    return Objects.hash(getKey(), getValue());
-  }
-
-  /**
-   * Key for different types of execution property.
-   */
-  public enum Key {
-    // Applies to IREdge
-    DataCommunicationPattern,
-    DataFlowModel,
-    DataStore,
-    MetricCollection,
-    Partitioner,
-    KeyExtractor,
-    UsedDataHandling,
-    Compression,
-    DuplicateEdgeGroup,
-    Coder,
-
-    // Applies to IRVertex
-    DynamicOptimizationType,
-    ExecutorPlacement,
-    Parallelism,
-    ScheduleGroupIndex,
-    StageId,
+    return value != null ? value.hashCode() : 0;
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index c3b2e222..c4b297b4 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -28,6 +28,7 @@
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.Serializable;
 import java.util.*;
 import java.util.function.Consumer;
@@ -35,10 +36,12 @@
 
 /**
  * ExecutionPropertyMap Class, which uses HashMap for keeping track of ExecutionProperties for vertices and edges.
+ * @param <T> Type of {@link ExecutionProperty} this map stores.
  */
-public final class ExecutionPropertyMap implements Serializable {
+@NotThreadSafe
+public final class ExecutionPropertyMap<T extends ExecutionProperty> implements Serializable {
   private final String id;
-  private final Map<ExecutionProperty.Key, ExecutionProperty<?>> properties;
+  private final Map<Class<? extends ExecutionProperty>, T> properties = new HashMap<>();
 
   /**
    * Constructor for ExecutionPropertyMap class.
@@ -47,7 +50,6 @@
   @VisibleForTesting
   public ExecutionPropertyMap(final String id) {
     this.id = id;
-    properties = new EnumMap<>(ExecutionProperty.Key.class);
   }
 
   /**
@@ -56,9 +58,10 @@ public ExecutionPropertyMap(final String id) {
    * @param commPattern Data communication pattern type of the edge.
    * @return The corresponding ExecutionPropertyMap.
    */
-  public static ExecutionPropertyMap of(final IREdge irEdge,
-                                        final DataCommunicationPatternProperty.Value commPattern) {
-    final ExecutionPropertyMap map = new ExecutionPropertyMap(irEdge.getId());
+  public static ExecutionPropertyMap<EdgeExecutionProperty> of(
+      final IREdge irEdge,
+      final DataCommunicationPatternProperty.Value commPattern) {
+    final ExecutionPropertyMap<EdgeExecutionProperty> map = new ExecutionPropertyMap<>(irEdge.getId());
     map.put(DataCommunicationPatternProperty.of(commPattern));
     map.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     switch (commPattern) {
@@ -85,8 +88,8 @@ public static ExecutionPropertyMap of(final IREdge irEdge,
    * @param irVertex irVertex to keep the execution property of.
    * @return The corresponding ExecutionPropertyMap.
    */
-  public static ExecutionPropertyMap of(final IRVertex irVertex) {
-    final ExecutionPropertyMap map = new ExecutionPropertyMap(irVertex.getId());
+  public static ExecutionPropertyMap<VertexExecutionProperty> of(final IRVertex irVertex) {
+    final ExecutionPropertyMap<VertexExecutionProperty> map = new ExecutionPropertyMap<>(irVertex.getId());
     map.put(ParallelismProperty.of(1));
     map.put(ExecutorPlacementProperty.of(ExecutorPlacementProperty.NONE));
     return map;
@@ -103,22 +106,21 @@ public String getId() {
   /**
    * Put the given execution property  in the ExecutionPropertyMap.
    * @param executionProperty execution property to insert.
-   * @return the inserted execution property.
+   * @return the previous execution property, or null if there was no execution property with the specified property key
    */
-  public ExecutionProperty<?> put(final ExecutionProperty<?> executionProperty) {
-    return properties.put(executionProperty.getKey(), executionProperty);
+  public T put(final T executionProperty) {
+    return properties.put(executionProperty.getClass(), executionProperty);
   }
 
   /**
    * Get the value of the given execution property type.
-   * @param <T> Type of the return value.
+   * @param <U> Type of the return value.
    * @param executionPropertyKey the execution property type to find the value of.
    * @return the value of the given execution property.
    */
-  public <T> T get(final ExecutionProperty.Key executionPropertyKey) {
-    ExecutionProperty<T> property = (ExecutionProperty<T>) properties.getOrDefault(executionPropertyKey,
-     ExecutionProperty.<T>emptyExecutionProperty());
-    return property.getValue();
+  public <U extends Serializable> Optional<U> get(final Class<? extends ExecutionProperty<U>> executionPropertyKey) {
+    final ExecutionProperty<U> property = properties.get(executionPropertyKey);
+    return property == null ? Optional.empty() : Optional.of(property.getValue());
   }
 
   /**
@@ -126,7 +128,7 @@ public String getId() {
    * @param key key of the execution property to remove.
    * @return the removed execution property
    */
-  public ExecutionProperty<?> remove(final ExecutionProperty.Key key) {
+  public T remove(final Class<? extends T> key) {
     return properties.remove(key);
   }
 
@@ -134,7 +136,7 @@ public String getId() {
    * @param key key to look for.
    * @return whether or not the execution property map contains the key.
    */
-  public boolean containsKey(final ExecutionProperty.Key key) {
+  public boolean containsKey(final Class<? extends T> key) {
     return properties.containsKey(key);
   }
 
@@ -142,7 +144,7 @@ public boolean containsKey(final ExecutionProperty.Key key) {
    * Same as forEach function in Java 8, but for execution properties.
    * @param action action to apply to each of the execution properties.
    */
-  public void forEachProperties(final Consumer<? super ExecutionProperty> action) {
+  public void forEachProperties(final Consumer<? super T> action) {
     properties.values().forEach(action);
   }
 
@@ -151,7 +153,7 @@ public String toString() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{");
     boolean isFirstPair = true;
-    for (final Map.Entry<ExecutionProperty.Key, ExecutionProperty<?>> entry : properties.entrySet()) {
+    for (final Map.Entry<Class<? extends ExecutionProperty>, T> entry : properties.entrySet()) {
       if (!isFirstPair) {
         sb.append(", ");
       }
@@ -180,8 +182,8 @@ public boolean equals(final Object obj) {
     ExecutionPropertyMap that = (ExecutionPropertyMap) obj;
 
     return new EqualsBuilder()
-        .append(properties.values().stream().map(ExecutionProperty::getValue).collect(Collectors.toSet()),
-            that.properties.values().stream().map(ExecutionProperty::getValue).collect(Collectors.toSet()))
+        .append(properties.values().stream().collect(Collectors.toSet()),
+            that.properties.values().stream().collect(Collectors.toSet()))
         .isEquals();
   }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/VertexExecutionProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/VertexExecutionProperty.java
new file mode 100644
index 00000000..4bdd2e6b
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/VertexExecutionProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.executionproperty;
+
+import java.io.Serializable;
+
+/**
+ * {@link ExecutionProperty} for {@link edu.snu.nemo.common.ir.vertex.IRVertex}.
+ * @param <T> Type of the value.
+ */
+public abstract class VertexExecutionProperty<T extends Serializable> extends ExecutionProperty<T> {
+  /**
+   * Default constructor.
+   * @param value value of the VertexExecutionProperty.
+   */
+  public VertexExecutionProperty(final T value) {
+    super(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index bcaaa9d5..de7359b8 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -18,14 +18,18 @@
 import edu.snu.nemo.common.ir.IdManager;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.dag.Vertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.function.Consumer;
 
 /**
  * The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo.
  * An IRVertex is created and modified in the compiler, and executed in the runtime.
  */
 public abstract class IRVertex extends Vertex {
-  private final ExecutionPropertyMap executionProperties;
+  private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
 
   /**
    * Constructor of IRVertex.
@@ -45,7 +49,7 @@ public IRVertex() {
    * @param thatVertex the edge to copy executionProperties to.
    */
   public final void copyExecutionPropertiesTo(final IRVertex thatVertex) {
-    this.getExecutionProperties().forEachProperties(thatVertex::setProperty);
+    this.getExecutionProperties().forEachProperties((Consumer<VertexExecutionProperty>) thatVertex::setProperty);
   }
 
   /**
@@ -53,7 +57,7 @@ public final void copyExecutionPropertiesTo(final IRVertex thatVertex) {
    * @param executionProperty new execution property.
    * @return the IRVertex with the execution property set.
    */
-  public final IRVertex setProperty(final ExecutionProperty<?> executionProperty) {
+  public final IRVertex setProperty(final VertexExecutionProperty<?> executionProperty) {
     executionProperties.put(executionProperty);
     return this;
   }
@@ -64,7 +68,8 @@ public final IRVertex setProperty(final ExecutionProperty<?> executionProperty)
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
-  public final <T> T getProperty(final ExecutionProperty.Key executionPropertyKey) {
+  public final <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends VertexExecutionProperty<T>> executionPropertyKey) {
     return executionProperties.get(executionPropertyKey);
   }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
index 20d2eca1..a761c19d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/LoopVertex.java
@@ -18,9 +18,9 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 import java.io.Serializable;
 import java.util.*;
@@ -217,7 +217,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
       dagBuilder.addVertex(newIrVertex, dagToAdd);
       dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
         final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
-        final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+        final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
             newSrc, newIrVertex, edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         dagBuilder.connectVertices(newIrEdge);
@@ -226,7 +226,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
 
     // process DAG incoming edges.
     getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
-      final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+      final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           edge.getSrc(), originalToNewIRVertex.get(dstVertex), edge.isSideInput());
       edge.copyExecutionPropertiesTo(newIrEdge);
       dagBuilder.connectVertices(newIrEdge);
@@ -235,7 +235,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
     if (loopTerminationConditionMet()) {
       // if termination condition met, we process the DAG outgoing edge.
       getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edge -> {
-        final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+        final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
             originalToNewIRVertex.get(srcVertex), edge.getDst(), edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         dagBuilder.addVertex(edge.getDst()).connectVertices(newIrEdge);
@@ -246,7 +246,7 @@ public LoopVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder)
     this.getDagIncomingEdges().clear();
     this.nonIterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
     this.iterativeIncomingEdges.forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
-      final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+      final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           originalToNewIRVertex.get(edge.getSrc()), dstVertex, edge.isSideInput());
       edge.copyExecutionPropertiesTo(newIrEdge);
       this.addDagIncomingEdge(newIrEdge);
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
index d0fa611d..7fb808f4 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.vertex.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
  * DynamicOptimizationType ExecutionProperty.
  */
-public final class DynamicOptimizationProperty extends ExecutionProperty<DynamicOptimizationProperty.Value> {
+public final class DynamicOptimizationProperty extends VertexExecutionProperty<DynamicOptimizationProperty.Value> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private DynamicOptimizationProperty(final Value value) {
-    super(Key.DynamicOptimizationType, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorPlacementProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorPlacementProperty.java
index dc2220e7..af22ed01 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorPlacementProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorPlacementProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.vertex.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
  * ExecutionPlacement ExecutionProperty.
  */
-public final class ExecutorPlacementProperty extends ExecutionProperty<String> {
+public final class ExecutorPlacementProperty extends VertexExecutionProperty<String> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private ExecutorPlacementProperty(final String value) {
-    super(Key.ExecutorPlacement, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java
index 40d87869..d07731dd 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ParallelismProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.vertex.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
  * Parallelism ExecutionProperty.
  */
-public final class ParallelismProperty extends ExecutionProperty<Integer> {
+public final class ParallelismProperty extends VertexExecutionProperty<Integer> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private ParallelismProperty(final Integer value) {
-    super(Key.Parallelism, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
index 2f19b612..08518ff9 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.vertex.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
  * ScheduleGroupIndex ExecutionProperty.
  */
-public final class ScheduleGroupIndexProperty extends ExecutionProperty<Integer> {
+public final class ScheduleGroupIndexProperty extends VertexExecutionProperty<Integer> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private ScheduleGroupIndexProperty(final Integer value) {
-    super(Key.ScheduleGroupIndex, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
index e8c03b27..09009922 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/StageIdProperty.java
@@ -15,18 +15,18 @@
  */
 package edu.snu.nemo.common.ir.vertex.executionproperty;
 
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
  * StageId ExecutionProperty.
  */
-public final class StageIdProperty extends ExecutionProperty<Integer> {
+public final class StageIdProperty extends VertexExecutionProperty<Integer> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
   private StageIdProperty(final Integer value) {
-    super(Key.StageId, value);
+    super(value);
   }
 
   /**
diff --git a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index 6ca8295b..f7eaad7f 100644
--- a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -21,8 +21,9 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -31,7 +32,7 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
 
 /**
  * Test {@link ExecutionPropertyMap}.
@@ -42,8 +43,8 @@
   private final DataCommunicationPatternProperty.Value comPattern = DataCommunicationPatternProperty.Value.OneToOne;
   private final IREdge edge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, source, destination);
 
-  private ExecutionPropertyMap edgeMap;
-  private ExecutionPropertyMap vertexMap;
+  private ExecutionPropertyMap<EdgeExecutionProperty> edgeMap;
+  private ExecutionPropertyMap<VertexExecutionProperty> vertexMap;
 
   @Before
   public void setUp() {
@@ -53,8 +54,8 @@ public void setUp() {
 
   @Test
   public void testDefaultValues() {
-    assertEquals(comPattern, edgeMap.get(ExecutionProperty.Key.DataCommunicationPattern));
-    assertEquals(1, vertexMap.<Integer>get(ExecutionProperty.Key.Parallelism).longValue());
+    assertEquals(comPattern, edgeMap.get(DataCommunicationPatternProperty.class).get());
+    assertEquals(1, vertexMap.get(ParallelismProperty.class).get().longValue());
     assertEquals(edge.getId(), edgeMap.getId());
     assertEquals(source.getId(), vertexMap.getId());
   }
@@ -62,16 +63,16 @@ public void testDefaultValues() {
   @Test
   public void testPutGetAndRemove() {
     edgeMap.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-    assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(ExecutionProperty.Key.DataStore));
+    assertEquals(DataStoreProperty.Value.MemoryStore, edgeMap.get(DataStoreProperty.class).get());
     edgeMap.put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
-    assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(ExecutionProperty.Key.DataFlowModel));
+    assertEquals(DataFlowModelProperty.Value.Pull, edgeMap.get(DataFlowModelProperty.class).get());
     edgeMap.put(CoderProperty.of(Coder.DUMMY_CODER));
-    assertEquals(Coder.DUMMY_CODER, edgeMap.get(ExecutionProperty.Key.Coder));
+    assertEquals(Coder.DUMMY_CODER, edgeMap.get(CoderProperty.class).get());
 
-    edgeMap.remove(ExecutionProperty.Key.DataFlowModel);
-    assertNull(edgeMap.get(ExecutionProperty.Key.DataFlowModel));
+    edgeMap.remove(DataFlowModelProperty.class);
+    assertFalse(edgeMap.get(DataFlowModelProperty.class).isPresent());
 
     vertexMap.put(ParallelismProperty.of(100));
-    assertEquals(100, vertexMap.<Integer>get(ExecutionProperty.Key.Parallelism).longValue());
+    assertEquals(100, vertexMap.get(ParallelismProperty.class).get().longValue());
   }
 }
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index 0b61d7b5..5065f9ce 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -20,7 +20,7 @@ import java.util
 import edu.snu.nemo.common.dag.DAGBuilder
 import edu.snu.nemo.common.ir.edge.IREdge
 import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -72,7 +72,8 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
     val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
       self.lastVertex, reduceByKeyVertex)
     newEdge.setProperty(
-      CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer)).asInstanceOf[ExecutionProperty[_]])
+      CoderProperty.of(new SparkCoder[Tuple2[K, V]](self.serializer))
+        .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]])
     newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
     builder.connectVertices(newEdge)
 
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 12318c55..135f6980 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -21,7 +21,7 @@ import edu.snu.nemo.client.JobLauncher
 import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
 import edu.snu.nemo.common.ir.edge.IREdge
 import edu.snu.nemo.common.ir.edge.executionproperty.{CoderProperty, KeyExtractorProperty}
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
 import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
@@ -51,8 +51,8 @@ final class RDD[T: ClassTag] protected[rdd] (
 
   protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
   private val loopVertexStack = new util.Stack[LoopVertex]
-  private val coderProperty: ExecutionProperty[_] =
-    CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[ExecutionProperty[_]]
+  private val coderProperty: EdgeExecutionProperty[_ <: Serializable] =
+    CoderProperty.of(new SparkCoder[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
   private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
 
   /**
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
index eab7a297..72cbd010 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/CompileTimePass.java
@@ -33,5 +33,5 @@
    * Getter for prerequisite execution properties.
    * @return set of prerequisite execution properties.
    */
-  Set<ExecutionProperty.Key> getPrerequisiteExecutionProperties();
+  Set<Class<? extends ExecutionProperty>> getPrerequisiteExecutionProperties();
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AnnotatingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AnnotatingPass.java
index 21aa6643..b7d36907 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AnnotatingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/AnnotatingPass.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.Set;
 
 /**
@@ -26,16 +26,16 @@
  * It is ensured by the compiler that the shape of the IR DAG itself is not modified by an AnnotatingPass.
  */
 public abstract class AnnotatingPass implements CompileTimePass {
-  private final ExecutionProperty.Key keyOfExecutionPropertyToModify;
-  private final Set<ExecutionProperty.Key> prerequisiteExecutionProperties;
+  private final Class<? extends ExecutionProperty> keyOfExecutionPropertyToModify;
+  private final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties;
 
   /**
    * Constructor.
    * @param keyOfExecutionPropertyToModify key of execution property to modify.
    * @param prerequisiteExecutionProperties prerequisite execution properties.
    */
-  public AnnotatingPass(final ExecutionProperty.Key keyOfExecutionPropertyToModify,
-                        final Set<ExecutionProperty.Key> prerequisiteExecutionProperties) {
+  public AnnotatingPass(final Class<? extends ExecutionProperty> keyOfExecutionPropertyToModify,
+                        final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties) {
     this.keyOfExecutionPropertyToModify = keyOfExecutionPropertyToModify;
     this.prerequisiteExecutionProperties = prerequisiteExecutionProperties;
   }
@@ -44,21 +44,20 @@ public AnnotatingPass(final ExecutionProperty.Key keyOfExecutionPropertyToModify
    * Constructor.
    * @param keyOfExecutionPropertyToModify key of execution property to modify.
    */
-  public AnnotatingPass(final ExecutionProperty.Key keyOfExecutionPropertyToModify) {
-    this.keyOfExecutionPropertyToModify = keyOfExecutionPropertyToModify;
-    this.prerequisiteExecutionProperties = new HashSet<>();
+  public AnnotatingPass(final Class<? extends ExecutionProperty> keyOfExecutionPropertyToModify) {
+    this(keyOfExecutionPropertyToModify, Collections.emptySet());
   }
 
   /**
    * Getter for key of execution property to modify.
    * @return key of execution property to modify.
    */
-  public final ExecutionProperty.Key getExecutionPropertyToModify() {
+  public final Class<? extends ExecutionProperty> getExecutionPropertyToModify() {
     return keyOfExecutionPropertyToModify;
   }
 
   @Override
-  public final Set<ExecutionProperty.Key> getPrerequisiteExecutionProperties() {
+  public final Set<Class<? extends ExecutionProperty>> getPrerequisiteExecutionProperties() {
     return prerequisiteExecutionProperties;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
index 57731599..1d1a651d 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/CompressionPass.java
@@ -18,38 +18,38 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 
 /**
  * A pass for applying compression algorithm for data flowing between vertices.
  */
 public final class CompressionPass extends AnnotatingPass {
-  private final CompressionProperty.Compression compression;
+  private final CompressionProperty.Value compression;
 
   /**
    * Default constructor. Uses LZ4 as default.
    */
   public CompressionPass() {
-    super(ExecutionProperty.Key.Compression);
-    this.compression = CompressionProperty.Compression.LZ4;
+    super(CompressionProperty.class);
+    this.compression = CompressionProperty.Value.LZ4;
   }
 
   /**
    * Constructor.
    * @param compression Compression to apply on edges.
    */
-  public CompressionPass(final CompressionProperty.Compression compression) {
-    super(ExecutionProperty.Key.Compression);
+  public CompressionPass(final CompressionProperty.Value compression) {
+    super(CompressionProperty.class);
     this.compression = compression;
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex).stream()
-        .filter(e -> !vertex.getProperty(ExecutionProperty.Key.StageId)
-            .equals(e.getSrc().getProperty(ExecutionProperty.Key.StageId)))
+        .filter(e -> !vertex.getPropertyValue(StageIdProperty.class).get()
+            .equals(e.getSrc().getPropertyValue(StageIdProperty.class).get()))
         .forEach(edge -> edge.setProperty(CompressionProperty.of(compression))));
 
     return dag;
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
index 2c344254..a0428a60 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeDataStorePass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 /**
@@ -32,7 +31,7 @@
    * Default constructor.
    */
   public DataSkewEdgeDataStorePass() {
-    super(ExecutionProperty.Key.DataStore);
+    super(DataStoreProperty.class);
   }
 
   @Override
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeMetricCollectionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeMetricCollectionPass.java
index 1d04e043..614d81b7 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeMetricCollectionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgeMetricCollectionPass.java
@@ -20,11 +20,9 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.Collections;
 
 /**
  * Pass to annotate the DAG for a job to perform data skew.
@@ -36,9 +34,7 @@
    * Default constructor.
    */
   public DataSkewEdgeMetricCollectionPass() {
-    super(ExecutionProperty.Key.MetricCollection, Stream.of(
-        ExecutionProperty.Key.DataCommunicationPattern
-    ).collect(Collectors.toSet()));
+    super(MetricCollectionProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -48,7 +44,7 @@ public DataSkewEdgeMetricCollectionPass() {
       if (v instanceof MetricCollectionBarrierVertex) {
         dag.getOutgoingEdgesOf(v).forEach(edge -> {
           // double checking.
-          if (edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)
+          if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
               .equals(DataCommunicationPatternProperty.Value.Shuffle)) {
             edge.setProperty(MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
           }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgePartitionerPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgePartitionerPass.java
index 71e11610..4fcfe41f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgePartitionerPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewEdgePartitionerPass.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 
 import java.util.Collections;
@@ -34,7 +33,7 @@
    * Default constructor.
    */
   public DataSkewEdgePartitionerPass() {
-    super(ExecutionProperty.Key.Partitioner, Collections.singleton(ExecutionProperty.Key.MetricCollection));
+    super(PartitionerProperty.class, Collections.singleton(MetricCollectionProperty.class));
   }
 
   @Override
@@ -45,7 +44,7 @@ public DataSkewEdgePartitionerPass() {
         outEdges.forEach(edge -> {
           // double checking.
           if (MetricCollectionProperty.Value.DataSkewRuntimePass
-            .equals(edge.getProperty(ExecutionProperty.Key.MetricCollection))) {
+            .equals(edge.getPropertyValue(MetricCollectionProperty.class).get())) {
             edge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
           }
         });
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java
index 6955bcae..c9e7aa01 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 
 /**
@@ -31,7 +30,7 @@
    * Default constructor.
    */
   public DataSkewVertexPass() {
-    super(ExecutionProperty.Key.DynamicOptimizationType);
+    super(DynamicOptimizationProperty.class);
   }
 
   @Override
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
index 02ace96c..eade93d1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPass.java
@@ -19,11 +19,8 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
-import java.util.Collections;
-
 /**
  * Pass for initiating IREdge Coder ExecutionProperty with default dummy coder.
  */
@@ -35,14 +32,14 @@
    * Default constructor.
    */
   public DefaultEdgeCoderPass() {
-    super(ExecutionProperty.Key.Coder, Collections.emptySet());
+    super(CoderProperty.class);
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          if (irEdge.getProperty(ExecutionProperty.Key.Coder) == null) {
+          if (!irEdge.getPropertyValue(CoderProperty.class).isPresent()) {
             irEdge.setProperty(DEFAULT_CODER_PROPERTY);
           }
         }));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
index 3acf173c..81e16f74 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeUsedDataHandlingPass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.util.Collections;
@@ -33,15 +32,15 @@
    * Default constructor.
    */
   public DefaultEdgeUsedDataHandlingPass() {
-    super(ExecutionProperty.Key.UsedDataHandling, Collections.singleton(ExecutionProperty.Key.DataStore));
+    super(UsedDataHandlingProperty.class, Collections.singleton(DataStoreProperty.class));
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          if (irEdge.getProperty(ExecutionProperty.Key.UsedDataHandling) == null) {
-            final DataStoreProperty.Value dataStoreValue = irEdge.getProperty(ExecutionProperty.Key.DataStore);
+          if (!irEdge.getPropertyValue(UsedDataHandlingProperty.class).isPresent()) {
+            final DataStoreProperty.Value dataStoreValue = irEdge.getPropertyValue(DataStoreProperty.class).get();
             if (DataStoreProperty.Value.MemoryStore.equals(dataStoreValue)
                 || DataStoreProperty.Value.SerializedMemoryStore.equals(dataStoreValue)) {
               irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Discard));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
index 6b90111c..d26320ff 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
@@ -21,14 +21,11 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 
 import java.util.Collections;
 import java.util.List;
 
-import static edu.snu.nemo.common.ir.executionproperty.ExecutionProperty.Key.DataCommunicationPattern;
-
 /**
  * Optimization pass for tagging parallelism execution property.
  */
@@ -52,7 +49,7 @@ public DefaultParallelismPass() {
    */
   public DefaultParallelismPass(final int desiredSourceParallelism,
                                 final int shuffleDecreaseFactor) {
-    super(ExecutionProperty.Key.Parallelism, Collections.singleton(DataCommunicationPattern));
+    super(ParallelismProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
     this.desiredSourceParallelism = desiredSourceParallelism;
     this.shuffleDecreaseFactor = shuffleDecreaseFactor;
   }
@@ -68,7 +65,7 @@ public DefaultParallelismPass(final int desiredSourceParallelism,
           // After that, we set the parallelism as the number of split readers.
           // (It can be more/less than the desired value.)
           final SourceVertex sourceVertex = (SourceVertex) vertex;
-          final Integer originalParallelism = vertex.getProperty(ExecutionProperty.Key.Parallelism);
+          final Integer originalParallelism = vertex.getPropertyValue(ParallelismProperty.class).get();
           // We manipulate them if it is set as default value of 1.
           if (originalParallelism.equals(1)) {
             vertex.setProperty(ParallelismProperty.of(
@@ -78,14 +75,14 @@ public DefaultParallelismPass(final int desiredSourceParallelism,
           // No reason to propagate via Broadcast edges, as the data streams that will use the broadcasted data
           // as a sideInput will have their own number of parallelism
           final Integer o2oParallelism = inEdges.stream()
-              .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
-                  .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))
-              .mapToInt(edge -> edge.getSrc().getProperty(ExecutionProperty.Key.Parallelism))
+             .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
+                  .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get()))
+              .mapToInt(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get())
               .max().orElse(1);
           final Integer shuffleParallelism = inEdges.stream()
               .filter(edge -> DataCommunicationPatternProperty.Value.Shuffle
-                  .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))
-              .mapToInt(edge -> edge.getSrc().getProperty(ExecutionProperty.Key.Parallelism))
+                  .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get()))
+              .mapToInt(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get())
               .map(i -> i / shuffleDecreaseFactor)
               .max().orElse(1);
           // We set the greater value as the parallelism.
@@ -93,7 +90,7 @@ public DefaultParallelismPass(final int desiredSourceParallelism,
           vertex.setProperty(ParallelismProperty.of(parallelism));
           // synchronize one-to-one edges parallelism
           recursivelySynchronizeO2OParallelism(dag, vertex, parallelism);
-        } else if (vertex.getProperty(ExecutionProperty.Key.Parallelism) == null) {
+        } else if (!vertex.getPropertyValue(ParallelismProperty.class).isPresent()) {
           throw new RuntimeException("There is a non-source vertex that doesn't have any inEdges "
               + "(excluding SideInput edges)");
         } // No problem otherwise.
@@ -117,12 +114,12 @@ static Integer recursivelySynchronizeO2OParallelism(final DAG<IRVertex, IREdge>
     final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
     final Integer ancestorParallelism = inEdges.stream()
         .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
-            .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))
+            .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get()))
         .map(IREdge::getSrc)
         .mapToInt(inVertex -> recursivelySynchronizeO2OParallelism(dag, inVertex, parallelism))
         .max().orElse(1);
     final Integer maxParallelism = ancestorParallelism > parallelism ? ancestorParallelism : parallelism;
-    final Integer myParallelism = vertex.getProperty(ExecutionProperty.Key.Parallelism);
+    final Integer myParallelism = vertex.getPropertyValue(ParallelismProperty.class).get();
 
     // update the vertex with the max value.
     if (maxParallelism > myParallelism) {
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
index e9c256c8..7638e5d7 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
@@ -18,9 +18,12 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.util.ArrayList;
@@ -42,12 +45,12 @@
    * Default constructor.
    */
   public DefaultStagePartitioningPass() {
-    super(ExecutionProperty.Key.StageId, Stream.of(
-        ExecutionProperty.Key.DataCommunicationPattern,
-        ExecutionProperty.Key.ExecutorPlacement,
-        ExecutionProperty.Key.DataFlowModel,
-        ExecutionProperty.Key.Partitioner,
-        ExecutionProperty.Key.Parallelism
+    super(StageIdProperty.class, Stream.of(
+        DataCommunicationPatternProperty.class,
+        ExecutorPlacementProperty.class,
+        DataFlowModelProperty.class,
+        PartitionerProperty.class,
+        ParallelismProperty.class
     ).collect(Collectors.toSet()));
   }
 
@@ -85,17 +88,17 @@ public DefaultStagePartitioningPass() {
         // Filter candidate incoming edges that can be included in a stage with the vertex.
         final Optional<List<IREdge>> inEdgesForStage = inEdgeList.map(e -> e.stream()
             // One to one edges
-            .filter(edge -> edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)
+            .filter(edge -> edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
                               .equals(DataCommunicationPatternProperty.Value.OneToOne))
             // MemoryStore placement
-            .filter(edge -> edge.getProperty(ExecutionProperty.Key.DataStore)
+            .filter(edge -> edge.getPropertyValue(DataStoreProperty.class).get()
                               .equals(DataStoreProperty.Value.MemoryStore))
             // if src and dst are placed on same container types
-            .filter(edge -> edge.getSrc().getProperty(ExecutionProperty.Key.ExecutorPlacement)
-                .equals(edge.getDst().getProperty(ExecutionProperty.Key.ExecutorPlacement)))
+            .filter(edge -> edge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get()
+                .equals(edge.getDst().getPropertyValue(ExecutorPlacementProperty.class).get()))
             // if src and dst have same parallelism
-            .filter(edge -> edge.getSrc().getProperty(ExecutionProperty.Key.Parallelism)
-                .equals(edge.getDst().getProperty(ExecutionProperty.Key.Parallelism)))
+            .filter(edge -> edge.getSrc().getPropertyValue(ParallelismProperty.class).get()
+                .equals(edge.getDst().getPropertyValue(ParallelismProperty.class).get()))
             // Src that is already included in a stage
             .filter(edge -> vertexStageNumHashMap.containsKey(edge.getSrc()))
             .collect(Collectors.toList()));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index 2b2f712d..719de834 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -18,7 +18,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 import java.util.Collections;
@@ -33,7 +32,7 @@
    * Default constructor.
    */
   public DisaggregationEdgeDataStorePass() {
-    super(ExecutionProperty.Key.DataStore, Collections.singleton(ExecutionProperty.Key.DataStore));
+    super(DataStoreProperty.class, Collections.singleton(DataStoreProperty.class));
   }
 
   @Override
@@ -42,7 +41,7 @@ public DisaggregationEdgeDataStorePass() {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       inEdges.forEach(edge -> {
         if (DataStoreProperty.Value.LocalFileStore
-              .equals(edge.getProperty(ExecutionProperty.Key.DataStore))) {
+              .equals(edge.getPropertyValue(DataStoreProperty.class).get())) {
           edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
         }
       });
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
index 06ab3d30..c4db8023 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DuplicateEdgeGroupSizePass.java
@@ -17,11 +17,12 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.util.HashMap;
+import java.util.Optional;
 
 /**
  * A pass for annotate duplicate data for each edge.
@@ -32,7 +33,7 @@
    * Default constructor.
    */
   public DuplicateEdgeGroupSizePass() {
-    super(ExecutionProperty.Key.DuplicateEdgeGroup);
+    super(DuplicateEdgeGroupProperty.class);
   }
 
   @Override
@@ -40,10 +41,10 @@ public DuplicateEdgeGroupSizePass() {
     final HashMap<String, Integer> groupIdToGroupSize = new HashMap<>();
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex)
         .forEach(e -> {
-          final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty =
-              e.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-          if (duplicateEdgeGroupProperty != null) {
-            final String groupId = duplicateEdgeGroupProperty.getGroupId();
+          final Optional<DuplicateEdgeGroupPropertyValue> duplicateEdgeGroupProperty =
+              e.getPropertyValue(DuplicateEdgeGroupProperty.class);
+          if (duplicateEdgeGroupProperty.isPresent()) {
+            final String groupId = duplicateEdgeGroupProperty.get().getGroupId();
             final Integer currentCount = groupIdToGroupSize.getOrDefault(groupId, 0);
             groupIdToGroupSize.put(groupId, currentCount + 1);
           }
@@ -51,12 +52,12 @@ public DuplicateEdgeGroupSizePass() {
 
     dag.topologicalDo(vertex -> dag.getIncomingEdgesOf(vertex)
         .forEach(e -> {
-          final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty =
-              e.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-          if (duplicateEdgeGroupProperty != null) {
-            final String groupId = duplicateEdgeGroupProperty.getGroupId();
+          final Optional<DuplicateEdgeGroupPropertyValue> duplicateEdgeGroupProperty =
+              e.getPropertyValue(DuplicateEdgeGroupProperty.class);
+          if (duplicateEdgeGroupProperty.isPresent()) {
+            final String groupId = duplicateEdgeGroupProperty.get().getGroupId();
             if (groupIdToGroupSize.containsKey(groupId)) {
-              duplicateEdgeGroupProperty.setGroupSize(groupIdToGroupSize.get(groupId));
+              duplicateEdgeGroupProperty.get().setGroupSize(groupIdToGroupSize.get(groupId));
             }
           }
         }));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataFlowModelPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataFlowModelPass.java
index c110b3ba..b74a995e 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataFlowModelPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataFlowModelPass.java
@@ -18,12 +18,11 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.PadoEdgeDataStorePass.fromTransientToReserved;
 
@@ -35,9 +34,7 @@
    * Default constructor.
    */
   public PadoEdgeDataFlowModelPass() {
-    super(ExecutionProperty.Key.DataFlowModel, Stream.of(
-        ExecutionProperty.Key.ExecutorPlacement
-    ).collect(Collectors.toSet()));
+    super(DataFlowModelProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
   }
 
   @Override
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
index e0ca2390..0777c1c9 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoEdgeDataStorePass.java
@@ -19,13 +19,11 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Pado pass for tagging edges with DataStore ExecutionProperty.
@@ -35,9 +33,7 @@
    * Default constructor.
    */
   public PadoEdgeDataStorePass() {
-    super(ExecutionProperty.Key.DataStore, Stream.of(
-        ExecutionProperty.Key.ExecutorPlacement
-    ).collect(Collectors.toSet()));
+    super(DataStoreProperty.class, Collections.singleton(ExecutorPlacementProperty.class));
   }
 
   @Override
@@ -49,7 +45,7 @@ public PadoEdgeDataStorePass() {
           if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
             edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           } else if (DataCommunicationPatternProperty.Value.OneToOne
-              .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
+              .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
           } else {
             edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
@@ -67,9 +63,9 @@ public PadoEdgeDataStorePass() {
    */
   static boolean fromTransientToReserved(final IREdge irEdge) {
     return ExecutorPlacementProperty.TRANSIENT
-        .equals(irEdge.getSrc().getProperty(ExecutionProperty.Key.ExecutorPlacement))
+        .equals(irEdge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get())
         && ExecutorPlacementProperty.RESERVED
-        .equals(irEdge.getDst().getProperty(ExecutionProperty.Key.ExecutorPlacement));
+        .equals(irEdge.getDst().getPropertyValue(ExecutorPlacementProperty.class).get());
   }
 
   /**
@@ -79,8 +75,8 @@ static boolean fromTransientToReserved(final IREdge irEdge) {
    */
   static boolean fromReservedToTransient(final IREdge irEdge) {
     return ExecutorPlacementProperty.RESERVED
-        .equals(irEdge.getSrc().getProperty(ExecutionProperty.Key.ExecutorPlacement))
+        .equals(irEdge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get())
         && ExecutorPlacementProperty.TRANSIENT
-        .equals(irEdge.getDst().getProperty(ExecutionProperty.Key.ExecutorPlacement));
+        .equals(irEdge.getDst().getPropertyValue(ExecutorPlacementProperty.class).get());
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoVertexExecutorPlacementPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoVertexExecutorPlacementPass.java
index 431f6927..64e4dfb4 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoVertexExecutorPlacementPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/PadoVertexExecutorPlacementPass.java
@@ -19,12 +19,10 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Pado pass for tagging vertices.
@@ -34,9 +32,7 @@
    * Default constructor.
    */
   public PadoVertexExecutorPlacementPass() {
-    super(ExecutionProperty.Key.ExecutorPlacement, Stream.of(
-        ExecutionProperty.Key.DataCommunicationPattern
-    ).collect(Collectors.toSet()));
+    super(ExecutorPlacementProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -63,7 +59,7 @@ public PadoVertexExecutorPlacementPass() {
    */
   private boolean hasM2M(final List<IREdge> irEdges) {
     return irEdges.stream().anyMatch(edge ->
-        edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)
+        edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
           .equals(DataCommunicationPatternProperty.Value.Shuffle));
   }
 
@@ -75,8 +71,8 @@ private boolean hasM2M(final List<IREdge> irEdges) {
   private boolean allO2OFromReserved(final List<IREdge> irEdges) {
     return irEdges.stream()
         .allMatch(edge -> DataCommunicationPatternProperty.Value.OneToOne.equals(
-            edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))
-            && edge.getSrc().getProperty(ExecutionProperty.Key.ExecutorPlacement).equals(
+            edge.getPropertyValue(DataCommunicationPatternProperty.class).get())
+            && edge.getSrc().getPropertyValue(ExecutorPlacementProperty.class).get().equals(
                 ExecutorPlacementProperty.RESERVED));
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
index 0e066c40..35c8c5ae 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ReviseInterStageEdgeDataStorePass.java
@@ -18,12 +18,11 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Edge data store pass to process inter-stage memory store edges.
@@ -33,9 +32,7 @@
    * Default constructor.
    */
   public ReviseInterStageEdgeDataStorePass() {
-    super(ExecutionProperty.Key.DataStore, Stream.of(
-        ExecutionProperty.Key.StageId
-    ).collect(Collectors.toSet()));
+    super(DataStoreProperty.class, Collections.singleton(StageIdProperty.class));
   }
 
   @Override
@@ -44,9 +41,9 @@ public ReviseInterStageEdgeDataStorePass() {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
-          if (DataStoreProperty.Value.MemoryStore.equals(edge.getProperty(ExecutionProperty.Key.DataStore))
-              && !edge.getSrc().getProperty(ExecutionProperty.Key.StageId)
-              .equals(edge.getDst().getProperty(ExecutionProperty.Key.StageId))) {
+          if (DataStoreProperty.Value.MemoryStore.equals(edge.getPropertyValue(DataStoreProperty.class).get())
+              && !edge.getSrc().getPropertyValue(StageIdProperty.class).get()
+              .equals(edge.getDst().getPropertyValue(StageIdProperty.class).get())) {
             edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           }
         });
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataFlowModelPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataFlowModelPass.java
index 8ff037bf..95154b4a 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataFlowModelPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataFlowModelPass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 
 import java.util.Collections;
@@ -34,7 +33,7 @@
    * Default constructor.
    */
   public SailfishEdgeDataFlowModelPass() {
-    super(ExecutionProperty.Key.DataFlowModel, Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(DataFlowModelProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -42,7 +41,7 @@ public SailfishEdgeDataFlowModelPass() {
     dag.getVertices().forEach(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       inEdges.forEach(edge -> {
-        if (edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)
+        if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
             .equals(DataCommunicationPatternProperty.Value.Shuffle)) {
           edge.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push)); // Push to the merger vertex.
         } else {
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
index fe5add04..6fae5efb 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDataStorePass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 
 import java.util.Collections;
@@ -33,7 +32,7 @@
    * Default constructor.
    */
   public SailfishEdgeDataStorePass() {
-    super(ExecutionProperty.Key.DataStore, Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(DataStoreProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -42,10 +41,10 @@ public SailfishEdgeDataStorePass() {
       // Find the merger vertex inserted by reshaping pass.
       if (dag.getIncomingEdgesOf(vertex).stream().anyMatch(irEdge ->
               DataCommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))) {
+          .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get()))) {
         dag.getIncomingEdgesOf(vertex).forEach(edgeToMerger -> {
           if (DataCommunicationPatternProperty.Value.Shuffle
-          .equals(edgeToMerger.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
+          .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             // Pass data through memory to the merger vertex.
             edgeToMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
           }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeUsedDataHandlingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeUsedDataHandlingPass.java
index 9e69f7f4..dd3a532f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeUsedDataHandlingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeUsedDataHandlingPass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.util.Collections;
@@ -34,14 +33,14 @@
    * Default constructor.
    */
   public SailfishEdgeUsedDataHandlingPass() {
-    super(ExecutionProperty.Key.UsedDataHandling, Collections.singleton(ExecutionProperty.Key.DataFlowModel));
+    super(UsedDataHandlingProperty.class, Collections.singleton(DataFlowModelProperty.class));
   }
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(irVertex ->
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          final DataFlowModelProperty.Value dataFlowModel = irEdge.getProperty(ExecutionProperty.Key.DataFlowModel);
+          final DataFlowModelProperty.Value dataFlowModel = irEdge.getPropertyValue(DataFlowModelProperty.class).get();
           if (DataFlowModelProperty.Value.Push.equals(dataFlowModel)) {
             irEdge.setProperty(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Discard));
           }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
index 6a484706..e76f4ba1 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPass.java
@@ -18,18 +18,19 @@
 import com.google.common.collect.Lists;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static edu.snu.nemo.common.ir.executionproperty.ExecutionProperty.Key.DataFlowModel;
-import static edu.snu.nemo.common.ir.executionproperty.ExecutionProperty.Key.StageId;
-
 /**
  * A pass for assigning each stages in schedule groups.
  * We traverse the DAG topologically to find the dependency information between stages and number them appropriately
@@ -40,13 +41,13 @@
    * Default constructor.
    */
   public ScheduleGroupPass() {
-    super(ExecutionProperty.Key.ScheduleGroupIndex, Stream.of(
-        ExecutionProperty.Key.StageId,
-        ExecutionProperty.Key.DataCommunicationPattern,
-        ExecutionProperty.Key.ExecutorPlacement,
-        ExecutionProperty.Key.DataFlowModel,
-        ExecutionProperty.Key.Partitioner,
-        ExecutionProperty.Key.Parallelism
+    super(ScheduleGroupIndexProperty.class, Stream.of(
+        StageIdProperty.class,
+        DataCommunicationPatternProperty.class,
+        ExecutorPlacementProperty.class,
+        DataFlowModelProperty.class,
+        PartitionerProperty.class,
+        ParallelismProperty.class
     ).collect(Collectors.toSet()));
   }
 
@@ -55,7 +56,8 @@ public ScheduleGroupPass() {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     // We assume that the input dag is tagged with stage ids.
-    if (dag.getVertices().stream().anyMatch(irVertex -> irVertex.getProperty(StageId) == null)) {
+    if (dag.getVertices().stream()
+        .anyMatch(irVertex -> !irVertex.getPropertyValue(StageIdProperty.class).isPresent())) {
       throw new RuntimeException("There exists an IR vertex going through ScheduleGroupPass "
           + "without stage id tagged.");
     }
@@ -63,12 +65,12 @@ public ScheduleGroupPass() {
     // Map of stage id to the stage ids that it depends on.
     final Map<Integer, Set<Integer>> dependentStagesMap = new HashMap<>();
     dag.topologicalDo(irVertex -> {
-      final Integer currentStageId = irVertex.getProperty(StageId);
+      final Integer currentStageId = irVertex.getPropertyValue(StageIdProperty.class).get();
       dependentStagesMap.putIfAbsent(currentStageId, new HashSet<>());
       // while traversing, we find the stages that point to the current stage and add them to the list.
       dag.getIncomingEdgesOf(irVertex).stream()
           .map(IREdge::getSrc)
-          .mapToInt(vertex -> vertex.getProperty(StageId))
+          .mapToInt(vertex -> vertex.getPropertyValue(StageIdProperty.class).get())
           .filter(n -> n != currentStageId)
           .forEach(n -> dependentStagesMap.get(currentStageId).add(n));
     });
@@ -107,17 +109,19 @@ public ScheduleGroupPass() {
     Lists.reverse(dag.getTopologicalSort()).forEach(v -> {
       // get the destination vertices of the edges that are marked as push
       final List<IRVertex> pushConnectedVertices = dag.getOutgoingEdgesOf(v).stream()
-          .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getProperty(DataFlowModel)))
+          .filter(e -> DataFlowModelProperty.Value.Push.equals(e.getPropertyValue(DataFlowModelProperty.class).get()))
           .map(IREdge::getDst)
           .collect(Collectors.toList());
       if (!pushConnectedVertices.isEmpty()) { // if we need to do something,
         // we find the min value of the destination schedule groups.
         final Integer newSchedulerGroupIndex = pushConnectedVertices.stream()
-            .mapToInt(irVertex -> stageIdToScheduleGroupIndexMap.get(irVertex.<Integer>getProperty(StageId)))
+            .mapToInt(irVertex -> stageIdToScheduleGroupIndexMap
+                .get(irVertex.getPropertyValue(StageIdProperty.class).get()))
             .min().orElseThrow(() -> new RuntimeException("a list was not empty, but produced an empty result"));
         // overwrite
-        final Integer originalScheduleGroupIndex = stageIdToScheduleGroupIndexMap.get(v.<Integer>getProperty(StageId));
-        stageIdToScheduleGroupIndexMap.replace(v.getProperty(StageId), newSchedulerGroupIndex);
+        final Integer originalScheduleGroupIndex = stageIdToScheduleGroupIndexMap
+            .get(v.getPropertyValue(StageIdProperty.class).get());
+        stageIdToScheduleGroupIndexMap.replace(v.getPropertyValue(StageIdProperty.class).get(), newSchedulerGroupIndex);
         // shift those if it came too far
         if (stageIdToScheduleGroupIndexMap.values().stream()
             .noneMatch(stageIndex -> stageIndex.equals(originalScheduleGroupIndex))) { // if it doesn't exist
@@ -133,9 +137,8 @@ public ScheduleGroupPass() {
     });
 
     // do the tagging
-    dag.topologicalDo(irVertex ->
-        irVertex.setProperty(
-            ScheduleGroupIndexProperty.of(stageIdToScheduleGroupIndexMap.get(irVertex.<Integer>getProperty(StageId)))));
+    dag.topologicalDo(irVertex -> irVertex.setProperty(ScheduleGroupIndexProperty.of(
+        stageIdToScheduleGroupIndexMap.get(irVertex.getPropertyValue(StageIdProperty.class).get()))));
 
     return dag;
   }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
index 4bd3117d..d548a1fd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ShuffleEdgePushPass.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 
 import java.util.Collections;
@@ -34,7 +33,7 @@
    * Default constructor.
    */
   public ShuffleEdgePushPass() {
-    super(ExecutionProperty.Key.DataFlowModel, Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(DataFlowModelProperty.class, Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -43,7 +42,7 @@ public ShuffleEdgePushPass() {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
-          if (edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)
+          if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get()
               .equals(DataCommunicationPatternProperty.Value.Shuffle)) {
             edge.setProperty(DataFlowModelProperty.of(DataFlowModelProperty.Value.Push));
           }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
index e6347c75..e5628ccd 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/CompositePass.java
@@ -29,7 +29,7 @@
  */
 public abstract class CompositePass implements CompileTimePass {
   private final List<CompileTimePass> passList;
-  private final Set<ExecutionProperty.Key> prerequisiteExecutionProperties;
+  private final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties;
 
   /**
    * Constructor.
@@ -75,7 +75,7 @@ public CompositePass(final List<CompileTimePass> passList) {
   }
 
   @Override
-  public final Set<ExecutionProperty.Key> getPrerequisiteExecutionProperties() {
+  public final Set<Class<? extends ExecutionProperty>> getPrerequisiteExecutionProperties() {
     return prerequisiteExecutionProperties;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 9777ebb9..64b1060f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -15,14 +15,15 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
+import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -38,7 +39,7 @@
    * Default constructor.
    */
   public CommonSubexpressionEliminationPass() {
-    super(Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -147,9 +148,12 @@ private static void mergeAndAddToBuilder(final List<OperatorVertex> ovs, final D
             final Set<IREdge> outListToModify = outEdges.get(ov);
             outEdges.getOrDefault(ov, new HashSet<>()).forEach(e -> {
               outListToModify.remove(e);
-              final IREdge newIrEdge = new IREdge(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+              final IREdge newIrEdge = new IREdge(e.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                   operatorVertexToUse, e.getDst());
-              newIrEdge.setProperty(CoderProperty.of(e.getProperty(ExecutionProperty.Key.Coder)));
+              final Optional<Coder> coderProperty = e.getPropertyValue(CoderProperty.class);
+              if (coderProperty.isPresent()) {
+                newIrEdge.setProperty(CoderProperty.of(coderProperty.get()));
+              }
               outListToModify.add(newIrEdge);
             });
             outEdges.remove(ov);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
index d042584c..41392643 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/DataSkewReshapingPass.java
@@ -23,7 +23,6 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,7 +41,7 @@
    * Default constructor.
    */
   public DataSkewReshapingPass() {
-    super(Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -54,7 +53,7 @@ public DataSkewReshapingPass() {
       // We care about OperatorVertices that have any incoming edges that are of type Shuffle.
       if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
           DataCommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))) {
+          .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get()))) {
         final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex
             = new MetricCollectionBarrierVertex<>();
         metricCollectionVertices.add(metricCollectionBarrierVertex);
@@ -63,13 +62,13 @@ public DataSkewReshapingPass() {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           // we insert the metric collection vertex when we meet a shuffle edge
           if (DataCommunicationPatternProperty.Value.Shuffle
-                .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
+                .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
             final IREdge newEdge = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
                 edge.getSrc(), metricCollectionBarrierVertex);
-            newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
+            newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
 
-            final IREdge edgeToGbK = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+            final IREdge edgeToGbK = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                 metricCollectionBarrierVertex, v, edge.isSideInput());
             edge.copyExecutionPropertiesTo(edgeToGbK);
             builder.connectVertices(newEdge);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index bbcc4304..c686eca3 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -16,10 +16,10 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
@@ -37,7 +37,7 @@
    * Default constructor.
    */
   public LoopExtractionPass() {
-    super(Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -99,7 +99,7 @@ private Integer findMaxLoopVertexStackDepth(final DAG<IRVertex, IREdge> dag) {
                 final LoopVertex srcLoopVertex = dag.getAssignedLoopVertexOf(irEdge.getSrc());
                 srcLoopVertex.addDagOutgoingEdge(irEdge);
                 final IREdge edgeFromLoop =
-                    new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+                    new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                         srcLoopVertex, operatorVertex, irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(edgeFromLoop);
                 builder.connectVertices(edgeFromLoop);
@@ -147,7 +147,7 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
           assignedLoopVertex.getBuilder().connectVertices(irEdge);
         } else { // loop -> loop connection
           assignedLoopVertex.addDagIncomingEdge(irEdge);
-          final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+          final IREdge edgeToLoop = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
               srcLoopVertex, assignedLoopVertex, irEdge.isSideInput());
           irEdge.copyExecutionPropertiesTo(edgeToLoop);
           builder.connectVertices(edgeToLoop);
@@ -155,7 +155,7 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
         }
       } else { // operator -> loop
         assignedLoopVertex.addDagIncomingEdge(irEdge);
-        final IREdge edgeToLoop = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+        final IREdge edgeToLoop = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
             irEdge.getSrc(), assignedLoopVertex, irEdge.isSideInput());
         irEdge.copyExecutionPropertiesTo(edgeToLoop);
         builder.connectVertices(edgeToLoop);
@@ -226,13 +226,13 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
               final IRVertex equivalentSrcVertex = equivalentVertices.get(srcVertex);
 
               // add the new IREdge to the iterative incoming edges list.
-              final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+              final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                   equivalentSrcVertex, equivalentDstVertex, edge.isSideInput());
               edge.copyExecutionPropertiesTo(newIrEdge);
               finalRootLoopVertex.addIterativeIncomingEdge(newIrEdge);
             } else {
               // src is from outside the previous loop. vertex outside previous loop -> DAG.
-              final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+              final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                   srcVertex, equivalentDstVertex, edge.isSideInput());
               edge.copyExecutionPropertiesTo(newIrEdge);
               finalRootLoopVertex.addNonIterativeIncomingEdge(newIrEdge);
@@ -245,7 +245,7 @@ private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final
             final IRVertex dstVertex = edge.getDst();
             final IRVertex equivalentSrcVertex = equivalentVertices.get(srcVertex);
 
-            final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+            final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                 equivalentSrcVertex, dstVertex, edge.isSideInput());
             edge.copyExecutionPropertiesTo(newIrEdge);
             finalRootLoopVertex.addDagOutgoingEdge(newIrEdge);
@@ -290,7 +290,7 @@ private static void addVertexToBuilder(final DAGBuilder<IRVertex, IREdge> builde
       if (edge.getSrc().equals(firstEquivalentVertex)) {
         builder.connectVertices(edge);
       } else {
-        final IREdge newIrEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+        final IREdge newIrEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
             firstEquivalentVertex, irVertex, edge.isSideInput());
         edge.copyExecutionPropertiesTo(newIrEdge);
         builder.connectVertices(newIrEdge);
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index 79f605c2..10d76e1a 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -16,12 +16,12 @@
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 
 import java.util.*;
 import java.util.function.IntPredicate;
@@ -102,7 +102,7 @@ private static void collectLoopVertices(final DAG<IRVertex, IREdge> dag,
      * Default constructor.
      */
     public LoopFusionPass() {
-      super(Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+      super(Collections.singleton(DataCommunicationPatternProperty.class));
     }
 
     @Override
@@ -159,8 +159,8 @@ public LoopFusionPass() {
             // inEdges.
             inEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
               if (builder.contains(irEdge.getSrc())) {
-                final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    irEdge.getSrc(), newLoopVertex, irEdge.isSideInput());
+                final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class)
+                    .get(), irEdge.getSrc(), newLoopVertex, irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(newIREdge);
                 builder.connectVertices(newIREdge);
               }
@@ -168,8 +168,8 @@ public LoopFusionPass() {
             // outEdges.
             outEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(irEdge -> {
               if (builder.contains(irEdge.getDst())) {
-                final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
-                    newLoopVertex, irEdge.getDst(), irEdge.isSideInput());
+                final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class)
+                    .get(), newLoopVertex, irEdge.getDst(), irEdge.isSideInput());
                 irEdge.copyExecutionPropertiesTo(newIREdge);
                 builder.connectVertices(newIREdge);
               }
@@ -250,7 +250,7 @@ private Boolean checkEqualityOfIntPredicates(final IntPredicate predicate1, fina
      * Default constructor.
      */
     public LoopInvariantCodeMotionPass() {
-      super(Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+      super(Collections.singleton(DataCommunicationPatternProperty.class));
     }
 
     @Override
@@ -286,9 +286,9 @@ public LoopInvariantCodeMotionPass() {
               candidate.getValue().stream().map(IREdge::getSrc).anyMatch(edgeSrc -> edgeSrc.equals(e.getSrc())))
               .forEach(edge -> {
                 edgesToRemove.add(edge);
-                final IREdge newEdge = new IREdge(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+                final IREdge newEdge = new IREdge(edge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
                     candidate.getKey(), edge.getDst(), edge.isSideInput());
-                newEdge.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
+                newEdge.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
                 edgesToAdd.add(newEdge);
               });
           final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
index 7c02a99d..3cb5a9ab 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
@@ -26,7 +26,7 @@
  * It is ensured by the compiler that no execution properties are modified by a ReshapingPass.
  */
 public abstract class ReshapingPass implements CompileTimePass {
-  private final Set<ExecutionProperty.Key> prerequisiteExecutionProperties;
+  private final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties;
 
   /**
    * Default constructor.
@@ -39,12 +39,12 @@ public ReshapingPass() {
    * Constructor.
    * @param prerequisiteExecutionProperties prerequisite of execution properties.
    */
-  public ReshapingPass(final Set<ExecutionProperty.Key> prerequisiteExecutionProperties) {
+  public ReshapingPass(final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties) {
     this.prerequisiteExecutionProperties = prerequisiteExecutionProperties;
   }
 
   @Override
-  public final Set<ExecutionProperty.Key> getPrerequisiteExecutionProperties() {
+  public final Set<Class<? extends ExecutionProperty>> getPrerequisiteExecutionProperties() {
     return prerequisiteExecutionProperties;
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 567f0fba..e899da97 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.transform.RelayTransform;
@@ -39,7 +38,7 @@
    * Default constructor.
    */
   public SailfishRelayReshapingPass() {
-    super(Collections.singleton(ExecutionProperty.Key.DataCommunicationPattern));
+    super(Collections.singleton(DataCommunicationPatternProperty.class));
   }
 
   @Override
@@ -51,10 +50,10 @@ public SailfishRelayReshapingPass() {
       // has Shuffle as data communication pattern.
       if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
               DataCommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))) {
+          .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get()))) {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           if (DataCommunicationPatternProperty.Value.Shuffle
-                .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
+                .equals(edge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             // Insert a merger vertex having transform that write received data immediately
             // before the vertex receiving shuffled data.
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
@@ -64,7 +63,7 @@ public SailfishRelayReshapingPass() {
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
             final IREdge newEdgeFromMerger = new IREdge(DataCommunicationPatternProperty.Value.OneToOne,
                 iFileMergerVertex, v);
-            newEdgeFromMerger.setProperty(CoderProperty.of(edge.getProperty(ExecutionProperty.Key.Coder)));
+            newEdgeFromMerger.setProperty(CoderProperty.of(edge.getPropertyValue(CoderProperty.class).get()));
             builder.connectVertices(newEdgeToMerger);
             builder.connectVertices(newEdgeFromMerger);
           } else {
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index 1c252899..7b40556c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -16,7 +16,13 @@
 package edu.snu.nemo.compiler.optimizer.policy;
 
 import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
@@ -33,8 +39,8 @@
 public final class PolicyBuilder {
   private final List<CompileTimePass> compileTimePasses;
   private final List<RuntimePass<?>> runtimePasses;
-  private final Set<ExecutionProperty.Key> finalizedExecutionProperties;
-  private final Set<ExecutionProperty.Key> annotatedExecutionProperties;
+  private final Set<Class<? extends ExecutionProperty>> finalizedExecutionProperties;
+  private final Set<Class<? extends ExecutionProperty>> annotatedExecutionProperties;
   private final Boolean strictPrerequisiteCheckMode;
 
   /**
@@ -56,13 +62,13 @@ public PolicyBuilder(final Boolean strictPrerequisiteCheckMode) {
     this.annotatedExecutionProperties = new HashSet<>();
     this.strictPrerequisiteCheckMode = strictPrerequisiteCheckMode;
     // DataCommunicationPattern is already set when creating the IREdge itself.
-    annotatedExecutionProperties.add(ExecutionProperty.Key.DataCommunicationPattern);
+    annotatedExecutionProperties.add(DataCommunicationPatternProperty.class);
     // Some default values are already annotated.
-    annotatedExecutionProperties.add(ExecutionProperty.Key.ExecutorPlacement);
-    annotatedExecutionProperties.add(ExecutionProperty.Key.Parallelism);
-    annotatedExecutionProperties.add(ExecutionProperty.Key.DataFlowModel);
-    annotatedExecutionProperties.add(ExecutionProperty.Key.DataStore);
-    annotatedExecutionProperties.add(ExecutionProperty.Key.Partitioner);
+    annotatedExecutionProperties.add(ExecutorPlacementProperty.class);
+    annotatedExecutionProperties.add(ParallelismProperty.class);
+    annotatedExecutionProperties.add(DataFlowModelProperty.class);
+    annotatedExecutionProperties.add(DataStoreProperty.class);
+    annotatedExecutionProperties.add(PartitionerProperty.class);
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
index 93b79648..5d333049 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
@@ -17,7 +17,6 @@
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
@@ -44,7 +43,7 @@ public static synchronized PhysicalPlan dynamicOptimization(
           final PhysicalPlan originalPlan,
           final MetricCollectionBarrierVertex metricCollectionBarrierVertex) {
     final DynamicOptimizationProperty.Value dynamicOptimizationType =
-        metricCollectionBarrierVertex.getProperty(ExecutionProperty.Key.DynamicOptimizationType);
+        metricCollectionBarrierVertex.getPropertyValue(DynamicOptimizationProperty.class).get();
 
     switch (dynamicOptimizationType) {
       case DataSkewRuntimePass:
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index fae871a7..a11997e3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -16,13 +16,17 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import edu.snu.nemo.common.ir.vertex.*;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
 import org.apache.reef.tang.annotations.Parameter;
@@ -82,10 +86,10 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     final Map<String, List<StageEdge>> edgeGroupToIrEdge = new HashMap<>();
 
     dagOfStages.topologicalDo(irVertex -> dagOfStages.getIncomingEdgesOf(irVertex).forEach(e -> {
-      final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty =
-          e.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-      if (duplicateEdgeGroupProperty != null) {
-        final String duplicateGroupId = duplicateEdgeGroupProperty.getGroupId();
+      final Optional<DuplicateEdgeGroupPropertyValue> duplicateEdgeGroupProperty =
+          e.getPropertyValue(DuplicateEdgeGroupProperty.class);
+      if (duplicateEdgeGroupProperty.isPresent()) {
+        final String duplicateGroupId = duplicateEdgeGroupProperty.get().getGroupId();
         edgeGroupToIrEdge.computeIfAbsent(duplicateGroupId, k -> new ArrayList<>()).add(e);
       }
     }));
@@ -93,10 +97,10 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
     edgeGroupToIrEdge.forEach((id, edges) -> {
       final StageEdge representativeEdge = edges.get(0);
       final DuplicateEdgeGroupPropertyValue representativeProperty =
-          representativeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
+          representativeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
       edges.forEach(e -> {
         final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty =
-            e.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
+            e.getPropertyValue(DuplicateEdgeGroupProperty.class).get();
         duplicateEdgeGroupProperty.setRepresentativeEdgeId(representativeEdge.getId());
         duplicateEdgeGroupProperty.setGroupSize(representativeProperty.getGroupSize());
       });
@@ -114,7 +118,7 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
 
     final Map<Integer, List<IRVertex>> vertexListForEachStage = new LinkedHashMap<>();
     irDAG.topologicalDo(irVertex -> {
-      final Integer stageNum = irVertex.getProperty(ExecutionProperty.Key.StageId);
+      final Integer stageNum = irVertex.getPropertyValue(StageIdProperty.class).get();
       if (!vertexListForEachStage.containsKey(stageNum)) {
         vertexListForEachStage.put(stageNum, new ArrayList<>());
       }
@@ -133,13 +137,13 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
       final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
           .orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
       final StageBuilder stageBuilder = new StageBuilder(
-          irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
-          irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism),
-          irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex),
-          irVertexOfNewStage.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+          irVertexOfNewStage.getPropertyValue(StageIdProperty.class).get(),
+          irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get(),
+          irVertexOfNewStage.getPropertyValue(ScheduleGroupIndexProperty.class).get(),
+          irVertexOfNewStage.getPropertyValue(ExecutorPlacementProperty.class).get());
 
       // Prepare useful variables.
-      final int stageParallelism = irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism);
+      final int stageParallelism = irVertexOfNewStage.getPropertyValue(ParallelismProperty.class).get();
       final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
         vertexIdToReadables.add(new HashMap<>());
@@ -237,9 +241,9 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
    */
   private void integrityCheck(final List<IRVertex> stageVertices) {
     final IRVertex firstVertex = stageVertices.get(0);
-    final String placement = firstVertex.getProperty(ExecutionProperty.Key.ExecutorPlacement);
-    final int scheduleGroup = firstVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
-    final int parallelism = firstVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism);
+    final String placement = firstVertex.getPropertyValue(ExecutorPlacementProperty.class).get();
+    final int scheduleGroup = firstVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
+    final int parallelism = firstVertex.getPropertyValue(ParallelismProperty.class).get();
 
     stageVertices.forEach(irVertex -> {
       // Check vertex type.
@@ -251,9 +255,9 @@ private void integrityCheck(final List<IRVertex> stageVertices) {
 
       // Check execution properties.
       if ((placement != null
-          && !placement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
-          || scheduleGroup != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)
-          || parallelism != irVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism)) {
+          && !placement.equals(irVertex.getPropertyValue(ExecutorPlacementProperty.class).get()))
+          || scheduleGroup != irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get()
+          || parallelism != irVertex.getPropertyValue(ParallelismProperty.class).get()) {
         throw new RuntimeException("Vertices of the same stage have different execution properties: "
             + irVertex.getId());
       }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index e1a11acc..e4bd7eb0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -17,8 +17,11 @@
 
 import edu.snu.nemo.common.dag.Edge;
 import edu.snu.nemo.common.dag.Vertex;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+
+import java.io.Serializable;
+import java.util.Optional;
 
 /**
  * Represents the edge between vertices in a logical/physical plan in runtime.
@@ -54,7 +57,8 @@ public RuntimeEdge(final String runtimeEdgeId,
    * @param executionPropertyKey key of the execution property.
    * @return the execution property.
    */
-  public final <T> T getProperty(final ExecutionProperty.Key executionPropertyKey) {
+  public final <T extends Serializable> Optional<T> getPropertyValue(
+      final Class<? extends EdgeExecutionProperty<T>> executionPropertyKey) {
     return executionProperties.get(executionPropertyKey);
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index b52a17f5..c9d3cc2d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,7 +17,8 @@
 
 import com.google.protobuf.ByteString;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.IllegalMessageException;
@@ -107,12 +108,15 @@ private void launchTask(final Task task) {
           new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
       task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
+          e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class)
+              .orElse(null)));
       task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
+          e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class).
+              orElse(null)));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
-            e.getProperty(ExecutionProperty.Key.Coder), e.getExecutionProperties()));
+            e.getPropertyValue(CoderProperty.class).get(), e.getPropertyValue(CompressionProperty.class)
+                .orElse(null)));
       });
 
       new TaskExecutor(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
index 9d3920ae..46c46092 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/SerializerManager.java
@@ -19,8 +19,6 @@
 import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,11 +49,22 @@ public SerializerManager() {
    *
    * @param runtimeEdgeId id of the runtime edge.
    * @param coder         the corresponding coder.
-   * @param propertyMap   ExecutionPropertyMap of runtime edge
+   */
+  public void register(final String runtimeEdgeId,
+                       final Coder coder) {
+    register(runtimeEdgeId, coder, null);
+  }
+
+  /**
+   * Register a coder for runtime edge.
+   *
+   * @param runtimeEdgeId id of the runtime edge.
+   * @param coder         the corresponding coder.
+   * @param compressionProperty   compression property, or null not to enable compression
    */
   public void register(final String runtimeEdgeId,
                        final Coder coder,
-                       final ExecutionPropertyMap propertyMap) {
+                       final CompressionProperty.Value compressionProperty) {
     LOG.debug("{} edge id registering to SerializerManager", runtimeEdgeId);
     final Serializer serializer = new Serializer(coder, Collections.emptyList());
     runtimeEdgeIdToSerializer.putIfAbsent(runtimeEdgeId, serializer);
@@ -63,7 +72,6 @@ public void register(final String runtimeEdgeId,
     final List<StreamChainer> streamChainerList = new ArrayList<>();
 
     // Compression chain
-    CompressionProperty.Compression compressionProperty = propertyMap.get(ExecutionProperty.Key.Compression);
     if (compressionProperty != null) {
       LOG.debug("Adding {} compression chain for {}",
           compressionProperty, runtimeEdgeId);
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
index d3218791..c5c2e927 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/streamchainer/CompressionStreamChainer.java
@@ -30,14 +30,14 @@
  * {@link StreamChainer} for applying compression.
  */
 public class CompressionStreamChainer implements StreamChainer {
-  private final CompressionProperty.Compression compression;
+  private final CompressionProperty.Value compression;
 
   /**
    * Constructor.
    *
    * @param compression compression method.
    */
-  public CompressionStreamChainer(final CompressionProperty.Compression compression) {
+  public CompressionStreamChainer(final CompressionProperty.Value compression) {
     this.compression = compression;
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 91298269..fdaecd54 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -18,9 +18,10 @@
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.data.KeyRange;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -68,15 +69,14 @@ public InputReader(final int dstTaskIndex,
    * @return the read data.
    */
   public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
-    DataCommunicationPatternProperty.Value comValue =
-        (DataCommunicationPatternProperty.Value)
-            runtimeEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern);
+    final Optional<DataCommunicationPatternProperty.Value> comValue =
+            runtimeEdge.getPropertyValue(DataCommunicationPatternProperty.class);
 
-    if (comValue.equals(DataCommunicationPatternProperty.Value.OneToOne)) {
+    if (comValue.get().equals(DataCommunicationPatternProperty.Value.OneToOne)) {
       return Collections.singletonList(readOneToOne());
-    } else if (comValue.equals(DataCommunicationPatternProperty.Value.BroadCast)) {
+    } else if (comValue.get().equals(DataCommunicationPatternProperty.Value.BroadCast)) {
       return readBroadcast();
-    } else if (comValue.equals(DataCommunicationPatternProperty.Value.Shuffle)) {
+    } else if (comValue.get().equals(DataCommunicationPatternProperty.Value.Shuffle)) {
       // If the dynamic optimization which detects data skew is enabled, read the data in the assigned range.
       // TODO #492: Modularize the data communication pattern.
       return readDataInRange();
@@ -87,20 +87,18 @@ public InputReader(final int dstTaskIndex,
 
   private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
     final String blockId = getBlockId(dstTaskIndex);
-    return blockManagerWorker.queryBlock(blockId, getId(),
-        (DataStoreProperty.Value) runtimeEdge.getProperty(ExecutionProperty.Key.DataStore),
-        HashRange.all());
+    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    return blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
   }
 
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
     final int numSrcTasks = this.getSourceParallelism();
+    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
 
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockId = getBlockId(srcTaskIdx);
-      futures.add(blockManagerWorker.queryBlock(blockId, getId(),
-          (DataStoreProperty.Value) runtimeEdge.getProperty(ExecutionProperty.Key.DataStore),
-          HashRange.all()));
+      futures.add(blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all()));
     }
 
     return futures;
@@ -113,6 +111,7 @@ public InputReader(final int dstTaskIndex,
    */
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
     assert (runtimeEdge instanceof StageEdge);
+    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
     final KeyRange hashRangeToRead =
         ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
@@ -125,9 +124,7 @@ public InputReader(final int dstTaskIndex,
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockId = getBlockId(srcTaskIdx);
       futures.add(
-          blockManagerWorker.queryBlock(blockId, getId(),
-              (DataStoreProperty.Value) runtimeEdge.getProperty(ExecutionProperty.Key.DataStore),
-              hashRangeToRead));
+          blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), hashRangeToRead));
     }
 
     return futures;
@@ -144,12 +141,12 @@ public RuntimeEdge getRuntimeEdge() {
    * @return the block id
    */
   private String getBlockId(final int taskIdx) {
-    final DuplicateEdgeGroupPropertyValue duplicateDataProperty =
-        (DuplicateEdgeGroupPropertyValue) runtimeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-    if (duplicateDataProperty == null || duplicateDataProperty.getGroupSize() <= 1) {
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+    if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
       return RuntimeIdGenerator.generateBlockId(getId(), taskIdx);
     }
-    final String duplicateEdgeId = duplicateDataProperty.getRepresentativeEdgeId();
+    final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
     return RuntimeIdGenerator.generateBlockId(duplicateEdgeId, taskIdx);
   }
 
@@ -168,10 +165,10 @@ public boolean isSideInputReader() {
    */
   public int getSourceParallelism() {
     if (DataCommunicationPatternProperty.Value.OneToOne
-        .equals(runtimeEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
+        .equals(runtimeEdge.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
       return 1;
     } else {
-      final Integer numSrcTasks = srcVertex.getProperty(ExecutionProperty.Key.Parallelism);
+      final Integer numSrcTasks = srcVertex.getPropertyValue(ParallelismProperty.class).get();
       return numSrcTasks;
     }
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 8f686d48..b0dd0800 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -19,7 +19,7 @@
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -65,22 +65,22 @@ public OutputWriter(final int hashRangeMultiplier,
     this.srcVertexId = srcRuntimeVertexId;
     this.dstIrVertex = dstIrVertex;
     this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getProperty(ExecutionProperty.Key.DataStore);
+    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
 
     // Setup partitioner
     final int dstParallelism = getDstParallelism();
-    final KeyExtractor keyExtractor = runtimeEdge.getProperty(ExecutionProperty.Key.KeyExtractor);
+    final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
     final PartitionerProperty.Value partitionerPropertyValue =
-        runtimeEdge.getProperty(ExecutionProperty.Key.Partitioner);
+        runtimeEdge.getPropertyValue(PartitionerProperty.class).get();
     switch (partitionerPropertyValue) {
       case IntactPartitioner:
         this.partitioner = new IntactPartitioner();
         break;
       case HashPartitioner:
-        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor);
+        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.get());
         break;
       case DataSkewHashPartitioner:
-        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor);
+        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.get());
         break;
       default:
         throw new UnsupportedPartitionerException(
@@ -88,11 +88,11 @@ public OutputWriter(final int hashRangeMultiplier,
     }
     blockToWrite = blockManagerWorker.createBlock(blockId, blockStoreValue);
 
-    final DuplicateEdgeGroupPropertyValue duplicateDataProperty =
-        runtimeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-    nonDummyBlock = duplicateDataProperty == null
-        || duplicateDataProperty.getRepresentativeEdgeId().equals(runtimeEdge.getId())
-        || duplicateDataProperty.getGroupSize() <= 1;
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+    nonDummyBlock = !duplicateDataProperty.isPresent()
+        || duplicateDataProperty.get().getRepresentativeEdgeId().equals(runtimeEdge.getId())
+        || duplicateDataProperty.get().getGroupSize() <= 1;
   }
 
   /**
@@ -113,13 +113,13 @@ public void write(final Object element) {
   public void close() {
     // Commit block.
     final UsedDataHandlingProperty.Value usedDataHandling =
-        runtimeEdge.getProperty(ExecutionProperty.Key.UsedDataHandling);
-    final DuplicateEdgeGroupPropertyValue duplicateDataProperty =
-        runtimeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-    final int multiplier = duplicateDataProperty == null ? 1 : duplicateDataProperty.getGroupSize();
+        runtimeEdge.getPropertyValue(UsedDataHandlingProperty.class).get();
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+    final int multiplier = duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
 
-    final boolean isDataSizeMetricCollectionEdge = MetricCollectionProperty.Value.DataSkewRuntimePass
-        .equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection));
+    final boolean isDataSizeMetricCollectionEdge = Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
+        .equals(runtimeEdge.getPropertyValue(MetricCollectionProperty.class));
     final Optional<Map<Integer, Long>> partitionSizeMap = blockToWrite.commit();
     // Return the total size of the committed block.
     if (partitionSizeMap.isPresent()) {
@@ -155,7 +155,7 @@ public void close() {
    */
   private int getDstParallelism() {
     return DataCommunicationPatternProperty.Value.OneToOne.equals(
-        runtimeEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))
-        ? 1 : dstIrVertex.getProperty(ExecutionProperty.Key.Parallelism);
+        runtimeEdge.getPropertyValue(DataCommunicationPatternProperty.class).get())
+        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).get();
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 6bac9352..48ee45ed 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -73,7 +73,7 @@
   private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
   private static final Coder CODER = PairCoder.of(IntCoder.of(), IntCoder.of());
   private static final Serializer SERIALIZER = new Serializer(CODER,
-      Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Compression.LZ4)));
+      Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)));
   private static final SerializerManager serializerManager = mock(SerializerManager.class);
   private BlockManagerMaster blockManagerMaster;
   private LocalMessageDispatcher messageDispatcher;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index a0252cac..012701cf 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -405,9 +404,10 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
     edgeProperties.put(DataCommunicationPatternProperty.of(commPattern));
     edgeProperties.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
     edgeProperties.put(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy")));
-    final DuplicateEdgeGroupPropertyValue duplicateDataProperty = edgeProperties.get(ExecutionProperty.Key.DuplicateEdgeGroup);
-    duplicateDataProperty.setRepresentativeEdgeId(edgeId);
-    duplicateDataProperty.setGroupSize(2);
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty
+        = edgeProperties.get(DuplicateEdgeGroupProperty.class);
+    duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
+    duplicateDataProperty.get().setGroupSize(2);
 
     edgeProperties.put(DataStoreProperty.of(store));
     edgeProperties.put(UsedDataHandlingProperty.of(UsedDataHandlingProperty.Value.Keep));
@@ -523,8 +523,8 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
   private Pair<IRVertex, IRVertex> setupVertices(final String edgeId,
                                                  final BlockManagerWorker sender,
                                                  final BlockManagerWorker receiver) {
-    serializerManagers.get(sender).register(edgeId, CODER, new ExecutionPropertyMap(""));
-    serializerManagers.get(receiver).register(edgeId, CODER, new ExecutionPropertyMap(""));
+    serializerManagers.get(sender).register(edgeId, CODER);
+    serializerManagers.get(receiver).register(edgeId, CODER);
 
     // Src setup
     final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source");
@@ -543,10 +543,10 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
                                                  final String edgeId2,
                                                  final BlockManagerWorker sender,
                                                  final BlockManagerWorker receiver) {
-    serializerManagers.get(sender).register(edgeId, CODER, new ExecutionPropertyMap(""));
-    serializerManagers.get(receiver).register(edgeId, CODER, new ExecutionPropertyMap(""));
-    serializerManagers.get(sender).register(edgeId2, CODER, new ExecutionPropertyMap(""));
-    serializerManagers.get(receiver).register(edgeId2, CODER, new ExecutionPropertyMap(""));
+    serializerManagers.get(sender).register(edgeId, CODER);
+    serializerManagers.get(receiver).register(edgeId, CODER);
+    serializerManagers.get(sender).register(edgeId2, CODER);
+    serializerManagers.get(receiver).register(edgeId2, CODER);
 
     // Src setup
     final SourceVertex srcVertex = new EmptyComponents.EmptySourceVertex("Source");
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
index 52e4df49..b403f7b5 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultEdgeCoderPassTest.java
@@ -19,7 +19,7 @@
 import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultEdgeCoderPass;
@@ -48,19 +48,19 @@ public void setUp() throws Exception {
   @Test
   public void testAnnotatingPass() {
     final AnnotatingPass coderPass = new DefaultEdgeCoderPass();
-    assertEquals(ExecutionProperty.Key.Coder, coderPass.getExecutionPropertyToModify());
+    assertEquals(CoderProperty.class, coderPass.getExecutionPropertyToModify());
   }
 
   @Test
   public void testNotOverride() {
     // Get the first coder from the compiled DAG
     final Coder compiledCoder = compiledDAG
-        .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+        .getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0)).get(0).getPropertyValue(CoderProperty.class).get();
     final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
 
     // Get the first coder from the processed DAG
-    final Coder processedCoder = processedDAG
-        .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0))
+        .get(0).getPropertyValue(CoderProperty.class).get();
     assertEquals(compiledCoder, processedCoder); // It must not be changed.
   }
 
@@ -68,12 +68,12 @@ public void testNotOverride() {
   public void testSetToDefault() throws Exception {
     // Remove the first coder from the compiled DAG (to let our pass to set as default coder).
     compiledDAG.getOutgoingEdgesOf(compiledDAG.getTopologicalSort().get(0))
-        .get(0).getExecutionProperties().remove(ExecutionProperty.Key.Coder);
+        .get(0).getExecutionProperties().remove(CoderProperty.class);
     final DAG<IRVertex, IREdge> processedDAG = new DefaultEdgeCoderPass().apply(compiledDAG);
 
     // Check whether the pass set the empty coder to our default coder.
-    final Coder processedCoder = processedDAG
-        .getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0)).get(0).getProperty(ExecutionProperty.Key.Coder);
+    final Coder processedCoder = processedDAG.getOutgoingEdgesOf(processedDAG.getTopologicalSort().get(0))
+        .get(0).getPropertyValue(CoderProperty.class).get();
     assertEquals(Coder.DUMMY_CODER, processedCoder);
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
index e1d1a054..105eeb8f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPassTest.java
@@ -18,9 +18,9 @@
 import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
 import edu.snu.nemo.tests.compiler.CompilerTestUtil;
@@ -48,25 +48,25 @@ public void setUp() throws Exception {
   @Test
   public void testAnnotatingPass() {
     final AnnotatingPass parallelismPass = new DefaultParallelismPass();
-    assertEquals(ExecutionProperty.Key.Parallelism, parallelismPass.getExecutionPropertyToModify());
+    assertEquals(ParallelismProperty.class, parallelismPass.getExecutionPropertyToModify());
   }
 
   @Test
-  public void testParallelismOne() throws Exception {
+  public void testParallelismOne() {
     final DAG<IRVertex, IREdge> processedDAG = new DefaultParallelismPass().apply(compiledDAG);
 
     processedDAG.getTopologicalSort().forEach(irVertex ->
-        assertEquals(1, irVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism).longValue()));
+        assertEquals(1, irVertex.getPropertyValue(ParallelismProperty.class).get().longValue()));
   }
 
   @Test
-  public void testParallelismTen() throws Exception {
+  public void testParallelismTen() {
     final int desiredSourceParallelism = 10;
     final DAG<IRVertex, IREdge> processedDAG = new DefaultParallelismPass(desiredSourceParallelism, 2).apply(compiledDAG);
 
     processedDAG.getTopologicalSort().stream()
         .filter(irVertex -> irVertex instanceof SourceVertex)
         .forEach(irVertex -> assertEquals(desiredSourceParallelism,
-            irVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism).longValue()));
+            irVertex.getPropertyValue(ParallelismProperty.class).get().longValue()));
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
index a6f0e3df..9e222fcc 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/ScheduleGroupPassTest.java
@@ -18,8 +18,8 @@
 import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
@@ -47,7 +47,7 @@ public void setUp() throws Exception {
   @Test
   public void testAnnotatingPass() {
     final AnnotatingPass scheduleGroupPass = new ScheduleGroupPass();
-    assertEquals(ExecutionProperty.Key.ScheduleGroupIndex, scheduleGroupPass.getExecutionPropertyToModify());
+    assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify());
   }
 
   /**
@@ -60,10 +60,9 @@ public void testScheduleGroupPass() throws Exception {
         new TestPolicy(), "");
 
     for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
-      assertTrue(irVertex.getProperty(ExecutionProperty.Key.ScheduleGroupIndex) != null);
-      final Integer currentScheduleGroupIndex = irVertex.getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
+      final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
       final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream()
-          .mapToInt(v -> v.getProperty(ExecutionProperty.Key.ScheduleGroupIndex))
+          .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
           .max().orElse(0);
       assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent);
     }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java
index b29ed414..50b3cdba 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java
@@ -36,6 +36,7 @@
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -62,7 +63,7 @@ public void testCompositePass() {
     final CompositePass dataSkewPass = new DataSkewCompositePass();
     assertEquals(NUM_OF_PASSES_IN_DATA_SKEW_PASS, dataSkewPass.getPassList().size());
 
-    final Set<ExecutionProperty.Key> prerequisites = new HashSet<>();
+    final Set<Class<? extends ExecutionProperty>> prerequisites = new HashSet<>();
     dataSkewPass.getPassList().forEach(compileTimePass ->
         prerequisites.addAll(compileTimePass.getPrerequisiteExecutionProperties()));
     dataSkewPass.getPassList().forEach(compileTimePass -> {
@@ -85,7 +86,7 @@ public void testDataSkewPass() throws Exception {
     final Long numOfShuffleGatherEdges = mrDAG.getVertices().stream().filter(irVertex ->
         mrDAG.getIncomingEdgesOf(irVertex).stream().anyMatch(irEdge ->
             DataCommunicationPatternProperty.Value.Shuffle
-            .equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))))
+            .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get())))
         .count();
     final DAG<IRVertex, IREdge> processedDAG = new DataSkewCompositePass().apply(mrDAG);
 
@@ -93,14 +94,14 @@ public void testDataSkewPass() throws Exception {
     processedDAG.getVertices().stream().map(processedDAG::getIncomingEdgesOf)
         .flatMap(List::stream)
         .filter(irEdge -> DataCommunicationPatternProperty.Value.Shuffle
-            .equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))
+            .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get()))
         .map(IREdge::getSrc)
         .forEach(irVertex -> assertTrue(irVertex instanceof MetricCollectionBarrierVertex));
 
     processedDAG.getVertices().forEach(v -> processedDAG.getOutgoingEdgesOf(v).stream()
-        .filter(e -> MetricCollectionProperty.Value.DataSkewRuntimePass
-                  .equals(e.getProperty(ExecutionProperty.Key.MetricCollection)))
-        .forEach(e -> assertEquals(e.getProperty(ExecutionProperty.Key.Partitioner),
-            PartitionerProperty.Value.DataSkewHashPartitioner)));
+        .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
+                  .equals(e.getPropertyValue(MetricCollectionProperty.class)))
+        .forEach(e -> assertEquals(PartitionerProperty.Value.DataSkewHashPartitioner,
+            e.getPropertyValue(PartitionerProperty.class).get())));
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
index b0f65eba..898f74ec 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DisaggregationPassTest.java
@@ -20,8 +20,8 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.StageIdProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultParallelismPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DisaggregationEdgeDataStorePass;
@@ -59,13 +59,13 @@ public void testDisaggregation() throws Exception {
     processedDAG.getTopologicalSort().forEach(irVertex -> {
       processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger -> {
         if (DataCommunicationPatternProperty.Value.OneToOne
-            .equals(edgeToMerger.getProperty(ExecutionProperty.Key.DataCommunicationPattern))
-            && edgeToMerger.getSrc().getProperty(ExecutionProperty.Key.StageId)
-            .equals(edgeToMerger.getDst().getProperty(ExecutionProperty.Key.StageId))) {
-          assertEquals(DataStoreProperty.Value.MemoryStore, edgeToMerger.getProperty(ExecutionProperty.Key.DataStore));
+            .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())
+            && edgeToMerger.getSrc().getPropertyValue(StageIdProperty.class).get()
+            .equals(edgeToMerger.getDst().getPropertyValue(StageIdProperty.class).get())) {
+          assertEquals(DataStoreProperty.Value.MemoryStore, edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
         } else {
           assertEquals(DataStoreProperty.Value.GlusterFileStore,
-              edgeToMerger.getProperty(ExecutionProperty.Key.DataStore));
+              edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
         }
       });
     });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
index 5f29ed2c..73014e28 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/PadoCompositePassTest.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.PadoEdgeDataStorePass;
@@ -53,41 +52,41 @@ public void testPadoPass() throws Exception {
     final DAG<IRVertex, IREdge> processedDAG = new PadoCompositePass().apply(compiledDAG);
 
     final IRVertex vertex1 = processedDAG.getTopologicalSort().get(0);
-    assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex1.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+    assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex1.getPropertyValue(ExecutorPlacementProperty.class).get());
 
     final IRVertex vertex5 = processedDAG.getTopologicalSort().get(1);
-    assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex5.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+    assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex5.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex5).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getProperty(ExecutionProperty.Key.DataStore));
-      assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getProperty(ExecutionProperty.Key.DataFlowModel));
+      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex6 = processedDAG.getTopologicalSort().get(2);
-    assertEquals(ExecutorPlacementProperty.RESERVED, vertex6.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+    assertEquals(ExecutorPlacementProperty.RESERVED, vertex6.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex6).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getProperty(ExecutionProperty.Key.DataStore));
-      assertEquals(DataFlowModelProperty.Value.Push, irEdge.getProperty(ExecutionProperty.Key.DataFlowModel));
+      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex4 = processedDAG.getTopologicalSort().get(6);
-    assertEquals(ExecutorPlacementProperty.RESERVED, vertex4.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+    assertEquals(ExecutorPlacementProperty.RESERVED, vertex4.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex4).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getProperty(ExecutionProperty.Key.DataStore));
-      assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getProperty(ExecutionProperty.Key.DataFlowModel));
+      assertEquals(DataStoreProperty.Value.MemoryStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex12 = processedDAG.getTopologicalSort().get(10);
-    assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex12.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+    assertEquals(ExecutorPlacementProperty.TRANSIENT, vertex12.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex12).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getProperty(ExecutionProperty.Key.DataStore));
-      assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getProperty(ExecutionProperty.Key.DataFlowModel));
+      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
 
     final IRVertex vertex14 = processedDAG.getTopologicalSort().get(12);
-    assertEquals(ExecutorPlacementProperty.RESERVED, vertex14.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+    assertEquals(ExecutorPlacementProperty.RESERVED, vertex14.getPropertyValue(ExecutorPlacementProperty.class).get());
     processedDAG.getIncomingEdgesOf(vertex14).forEach(irEdge -> {
-      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getProperty(ExecutionProperty.Key.DataStore));
-      assertEquals(DataFlowModelProperty.Value.Push, irEdge.getProperty(ExecutionProperty.Key.DataFlowModel));
+      assertEquals(DataStoreProperty.Value.LocalFileStore, irEdge.getPropertyValue(DataStoreProperty.class).get());
+      assertEquals(DataFlowModelProperty.Value.Push, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
     });
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index 80a8311b..cd456fef 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -22,7 +22,6 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.SailfishPass;
 import edu.snu.nemo.tests.compiler.CompilerTestUtil;
@@ -48,40 +47,40 @@ public void setUp() throws Exception {
   }
 
   @Test
-  public void testSailfish() throws Exception {
+  public void testSailfish() {
     final DAG<IRVertex, IREdge> processedDAG = new SailfishPass().apply(compiledDAG);
 
     processedDAG.getTopologicalSort().forEach(irVertex -> {
       if (processedDAG.getIncomingEdgesOf(irVertex).stream().anyMatch(irEdge ->
               DataCommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))) {
+          .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get()))) {
         // Merger vertex
         processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger -> {
           if (DataCommunicationPatternProperty.Value.Shuffle
-          .equals(edgeToMerger.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) {
+          .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())) {
             assertEquals(DataFlowModelProperty.Value.Push,
-                edgeToMerger.getProperty(ExecutionProperty.Key.DataFlowModel));
+                edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
             assertEquals(UsedDataHandlingProperty.Value.Discard,
-                edgeToMerger.getProperty(ExecutionProperty.Key.UsedDataHandling));
+                edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
             assertEquals(DataStoreProperty.Value.SerializedMemoryStore,
-                edgeToMerger.getProperty(ExecutionProperty.Key.DataStore));
+                edgeToMerger.getPropertyValue(DataStoreProperty.class).get());
           } else {
             assertEquals(DataFlowModelProperty.Value.Pull,
-                edgeToMerger.getProperty(ExecutionProperty.Key.DataFlowModel));
+                edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
           }
         });
         processedDAG.getOutgoingEdgesOf(irVertex).forEach(edgeFromMerger -> {
           assertEquals(DataFlowModelProperty.Value.Pull,
-              edgeFromMerger.getProperty(ExecutionProperty.Key.DataFlowModel));
+              edgeFromMerger.getPropertyValue(DataFlowModelProperty.class).get());
           assertEquals(DataCommunicationPatternProperty.Value.OneToOne,
-              edgeFromMerger.getProperty(ExecutionProperty.Key.DataCommunicationPattern));
+              edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
           assertEquals(DataStoreProperty.Value.LocalFileStore,
-              edgeFromMerger.getProperty(ExecutionProperty.Key.DataStore));
+              edgeFromMerger.getPropertyValue(DataStoreProperty.class).get());
         });
       } else {
         // Non merger vertex.
         processedDAG.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getProperty(ExecutionProperty.Key.DataFlowModel));
+          assertEquals(DataFlowModelProperty.Value.Pull, irEdge.getPropertyValue(DataFlowModelProperty.class).get());
         });
       }
     });
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
index 302ed4f8..8a6b1a85 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopFusionPassTest.java
@@ -19,8 +19,8 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass;
@@ -112,15 +112,15 @@ private static void addLoopVertexToBuilder(final DAGBuilder<IRVertex, IREdge> bu
                                              final LoopVertex loopVertexToFollow) {
     builder.addVertex(loopVertexToFollow);
     loopVertexToFollow.getIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
-      final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+      final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           vertexToBeFollowed, loopVertexToFollow);
-      newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
+      newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get()));
       builder.connectVertices(newIREdge);
     }));
     loopVertexToFollow.getNonIterativeIncomingEdges().values().forEach(irEdges -> irEdges.forEach(irEdge -> {
-      final IREdge newIREdge = new IREdge(irEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern),
+      final IREdge newIREdge = new IREdge(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get(),
           irEdge.getSrc(), loopVertexToFollow);
-      newIREdge.setProperty(CoderProperty.of(irEdge.getProperty(ExecutionProperty.Key.Coder)));
+      newIREdge.setProperty(CoderProperty.of(irEdge.getPropertyValue(CoderProperty.class).get()));
       builder.connectVertices(newIREdge);
     }));
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index ca551b02..de6ff56f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -19,8 +19,8 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CoderProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.LoopVertex;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.LoopExtractionPass;
@@ -90,9 +90,9 @@ public void setUp() throws Exception {
           } else {
             final Optional<IREdge> incomingEdge = newDAGIncomingEdge.stream().findFirst();
             assertTrue(incomingEdge.isPresent());
-            final IREdge newIREdge = new IREdge(incomingEdge.get().getProperty(
-                ExecutionProperty.Key.DataCommunicationPattern), incomingEdge.get().getSrc(), alsLoop);
-            newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getProperty(ExecutionProperty.Key.Coder)));
+            final IREdge newIREdge = new IREdge(incomingEdge.get().getPropertyValue(
+                DataCommunicationPatternProperty.class).get(), incomingEdge.get().getSrc(), alsLoop);
+            newIREdge.setProperty(CoderProperty.of(incomingEdge.get().getPropertyValue(CoderProperty.class).get()));
             builder.connectVertices(newIREdge);
           }
         });


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services