You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/05/07 15:30:32 UTC

[incubator-heron] branch master updated: [3821] Remove deprecated Host Path, NFS, and EBS support for Kubernetes (#3825)

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new dc2d9f675a8 [3821] Remove deprecated Host Path, NFS, and EBS support for Kubernetes (#3825)
dc2d9f675a8 is described below

commit dc2d9f675a85effcaa74cba6c8fff8451597e146
Author: Saad Ur Rahman <su...@users.noreply.github.com>
AuthorDate: Sat May 7 11:30:27 2022 -0400

    [3821] Remove deprecated Host Path, NFS, and EBS support for Kubernetes (#3825)
    
    * Removed deprecated loading of EBS, NFS, and Host Path via topology configurations for Kubernetes.
    
    * Refactored creation of NFS, Host Path, Empty Directory, and Persistent Volume Claims (templates and volumes) for Kubernetes to a hybrid-factory pattern for volume generation.
---
 .../scheduler/kubernetes/KubernetesContext.java    |  51 ---
 .../heron/scheduler/kubernetes/V1Controller.java   | 180 +----------
 .../apache/heron/scheduler/kubernetes/Volumes.java | 255 ++++++++++++---
 .../scheduler/kubernetes/V1ControllerTest.java     | 163 +---------
 .../heron/scheduler/kubernetes/VolumesTests.java   | 350 ++++++++++++++++++---
 5 files changed, 531 insertions(+), 468 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 242e4f1fadd..8d9a8795070 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -69,25 +69,6 @@ public final class KubernetesContext extends Context {
   public static final String KUBERNETES_VOLUME_TYPE = "heron.kubernetes.volume.type";
 
 
-  // HostPath volume keys
-  // https://kubernetes.io/docs/concepts/storage/volumes/#hostpath
-  public static final String KUBERNETES_VOLUME_HOSTPATH_PATH =
-      "heron.kubernetes.volume.hostPath.path";
-
-  // nfs volume keys
-  // https://kubernetes.io/docs/concepts/storage/volumes/#nfs
-  public static final String KUBERNETES_VOLUME_NFS_PATH =
-      "heron.kubernetes.volume.nfs.path";
-  public static final String KUBERNETES_VOLUME_NFS_SERVER =
-      "heron.kubernetes.volume.nfs.server";
-
-  // awsElasticBlockStore volume keys
-  // https://kubernetes.io/docs/concepts/storage/volumes/#awselasticblockstore
-  public static final String KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID =
-      "heron.kubernetes.volume.awsElasticBlockStore.volumeID";
-  public static final String KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
-      "heron.kubernetes.volume.awsElasticBlockStore.fsType";
-
   // Pod Template ConfigMap: heron.kubernetes.[executor | manager].pod.template
   public static final String KUBERNETES_POD_TEMPLATE_LOCATION =
       "heron.kubernetes.%s.pod.template";
@@ -166,38 +147,6 @@ public final class KubernetesContext extends Context {
             config.getStringValue(KUBERNETES_RESOURCE_REQUEST_MODE));
   }
 
-  static String getVolumeType(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_TYPE);
-  }
-
-  static String getVolumeName(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_NAME);
-  }
-
-  static String getHostPathVolumePath(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_HOSTPATH_PATH);
-  }
-
-  static String getNfsVolumePath(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_NFS_PATH);
-  }
-
-  static String getNfsServer(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_NFS_SERVER);
-  }
-
-  static String getAwsEbsVolumeId(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID);
-  }
-
-  static String getAwsEbsFsType(Config config) {
-    return config.getStringValue(KUBERNETES_VOLUME_AWS_EBS_FS_TYPE);
-  }
-
-  static boolean hasVolume(Config config) {
-    return isNotEmpty(getVolumeType(config));
-  }
-
   static String getContainerVolumeName(Config config) {
     return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME);
   }
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index be6e48ebe4f..1cd85e5ad6f 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -64,7 +64,6 @@ import io.kubernetes.client.openapi.models.V1LabelSelector;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
@@ -78,9 +77,7 @@ import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 import io.kubernetes.client.openapi.models.V1Status;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
-import io.kubernetes.client.openapi.models.V1VolumeBuilder;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 import io.kubernetes.client.util.PatchUtils;
 import io.kubernetes.client.util.Yaml;
 import okhttp3.Response;
@@ -656,8 +653,6 @@ public class V1Controller extends KubernetesController {
 
     podSpec.setContainers(containers);
 
-    addVolumesIfPresent(podSpec);
-
     mountSecretsAsVolumes(podSpec);
   }
 
@@ -695,28 +690,6 @@ public class V1Controller extends KubernetesController {
     return tolerations;
   }
 
-  /**
-   * Adds volume to the <code>Pod Spec</code> that Heron requires. Heron's values taking precedence.
-   * @param spec <code>Pod Spec</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void addVolumesIfPresent(final V1PodSpec spec) {
-    final Config config = getConfiguration();
-    if (KubernetesContext.hasVolume(config)) {
-      final V1Volume volumeFromConfig = Volumes.get().create(config);
-      if (volumeFromConfig != null) {
-        // Merge volumes. Deduplicate using volume's name with Heron defaults taking precedence.
-        KubernetesUtils.V1ControllerUtils<V1Volume> utils =
-            new KubernetesUtils.V1ControllerUtils<>();
-        spec.setVolumes(
-            utils.mergeListsDedupe(Collections.singletonList(volumeFromConfig), spec.getVolumes(),
-                Comparator.comparing(V1Volume::getName), "Pod Template Volumes")
-        );
-        LOG.fine("Adding volume: " + volumeFromConfig);
-      }
-    }
-  }
-
   /**
    * Adds <code>Volume Mounts</code> for <code>Secrets</code> to a pod.
    * @param podSpec <code>Pod Spec</code> to add secrets to.
@@ -1145,77 +1118,13 @@ public class V1Controller extends KubernetesController {
         continue;
       }
 
-      V1PersistentVolumeClaim claim = new V1PersistentVolumeClaimBuilder()
-          .withNewMetadata()
-            .withName(pvc.getKey())
-            .withLabels(getPersistentVolumeClaimLabels(getTopologyName()))
-          .endMetadata()
-          .withNewSpec()
-            .withStorageClassName("")
-          .endSpec()
-          .build();
-
-      // Populate PVC options.
-      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> option
-          : pvc.getValue().entrySet()) {
-        String optionValue = option.getValue();
-        switch(option.getKey()) {
-          case storageClassName:
-            claim.getSpec().setStorageClassName(optionValue);
-            break;
-          case sizeLimit:
-            claim.getSpec().setResources(
-                    new V1ResourceRequirements()
-                        .putRequestsItem("storage", new Quantity(optionValue)));
-            break;
-          case accessModes:
-            claim.getSpec().setAccessModes(Arrays.asList(optionValue.split(",")));
-            break;
-          case volumeMode:
-            claim.getSpec().setVolumeMode(optionValue);
-            break;
-          // Valid ignored options not used in a PVC.
-          default:
-            break;
-        }
-      }
-      listOfPVCs.add(claim);
+      listOfPVCs.add(Volumes.get()
+          .createPersistentVolumeClaim(pvc.getKey(),
+              getPersistentVolumeClaimLabels(getTopologyName()), pvc.getValue()));
     }
     return listOfPVCs;
   }
 
-  /**
-   * Generates the <code>Volume Mounts</code> to be placed in the <code>Executor</code>
-   * and <code>Manager</code> from options on the CLI.
-   * @param volumeName Name of the <code>Volume</code>.
-   * @param configs Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
-   * @return A configured <code>V1VolumeMount</code>.
-   */
-  @VisibleForTesting
-  protected V1VolumeMount createVolumeMountsCLI(final String volumeName,
-      final Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
-    final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
-        .withName(volumeName)
-        .build();
-    for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config : configs.entrySet()) {
-      switch (config.getKey()) {
-        case path:
-          volumeMount.mountPath(config.getValue());
-          break;
-        case subPath:
-          volumeMount.subPath(config.getValue());
-          break;
-        case readOnly:
-          volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
-          break;
-        default:
-          break;
-      }
-    }
-
-    return volumeMount;
-  }
-
   /**
    * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Persistent Volume Claims</code>s
    *  to be placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
@@ -1235,16 +1144,9 @@ public class V1Controller extends KubernetesController {
       final String claimName = configs.getValue()
           .get(KubernetesConstants.VolumeConfigKeys.claimName);
       if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
-        volumes.add(
-            new V1VolumeBuilder()
-                .withName(volumeName)
-                .withNewPersistentVolumeClaim()
-                  .withClaimName(claimName)
-                .endPersistentVolumeClaim()
-                .build()
-        );
+        volumes.add(Volumes.get().createPersistentVolumeClaim(claimName, volumeName));
       }
-      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
     }
   }
 
@@ -1262,27 +1164,10 @@ public class V1Controller extends KubernetesController {
     for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
         : mapOfOpts.entrySet()) {
       final String volumeName = configs.getKey();
-      final V1Volume volume = new V1VolumeBuilder()
-          .withName(volumeName)
-          .withNewEmptyDir()
-          .endEmptyDir()
-          .build();
-
-      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
-          : configs.getValue().entrySet()) {
-        switch(config.getKey()) {
-          case medium:
-            volume.getEmptyDir().medium(config.getValue());
-            break;
-          case sizeLimit:
-            volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
-            break;
-          default:
-            break;
-        }
-      }
+      final V1Volume volume = Volumes.get()
+          .createVolume(Volumes.VolumeType.EmptyDir, volumeName, configs.getValue());
       volumes.add(volume);
-      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
     }
   }
 
@@ -1300,27 +1185,10 @@ public class V1Controller extends KubernetesController {
     for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
         : mapOfOpts.entrySet()) {
       final String volumeName = configs.getKey();
-      final V1Volume volume = new V1VolumeBuilder()
-          .withName(volumeName)
-          .withNewHostPath()
-          .endHostPath()
-          .build();
-
-      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
-          : configs.getValue().entrySet()) {
-        switch(config.getKey()) {
-          case type:
-            volume.getHostPath().setType(config.getValue());
-            break;
-          case pathOnHost:
-            volume.getHostPath().setPath(config.getValue());
-            break;
-          default:
-            break;
-        }
-      }
+      final V1Volume volume = Volumes.get()
+          .createVolume(Volumes.VolumeType.HostPath, volumeName, configs.getValue());
       volumes.add(volume);
-      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
     }
   }
 
@@ -1338,30 +1206,10 @@ public class V1Controller extends KubernetesController {
     for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
         : mapOfOpts.entrySet()) {
       final String volumeName = configs.getKey();
-      final V1Volume volume = new V1VolumeBuilder()
-          .withName(volumeName)
-          .withNewNfs()
-          .endNfs()
-          .build();
-
-      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
-          : configs.getValue().entrySet()) {
-        switch(config.getKey()) {
-          case server:
-            volume.getNfs().setServer(config.getValue());
-            break;
-          case pathOnNFS:
-            volume.getNfs().setPath(config.getValue());
-            break;
-          case readOnly:
-            volume.getNfs().setReadOnly(Boolean.parseBoolean(config.getValue()));
-            break;
-          default:
-            break;
-        }
-      }
+      final V1Volume volume = Volumes.get()
+          .createVolume(Volumes.VolumeType.NetworkFileSystem, volumeName, configs.getValue());
       volumes.add(volume);
-      volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
     }
   }
 
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
index 90f15bd7334..4dcba8fdccb 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
@@ -19,97 +19,250 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.heron.spi.common.Config;
-
-import io.kubernetes.client.openapi.models.V1AWSElasticBlockStoreVolumeSource;
-import io.kubernetes.client.openapi.models.V1HostPathVolumeSource;
-import io.kubernetes.client.openapi.models.V1NFSVolumeSource;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
 final class Volumes {
 
-  static final String AWS_EBS = "awsElasticBlockStore";
-  static final String HOST_PATH = "hostPath";
-  static final String NFS = "nfs";
-
-  private final Map<String, VolumeFactory> volumes = new HashMap<>();
+  public enum VolumeType {
+    EmptyDir,
+    HostPath,
+    NetworkFileSystem
+  }
+  private final Map<VolumeType, IVolumeFactory> volumes = new HashMap<>();
 
   private Volumes() {
-    volumes.put(HOST_PATH, new HostPathVolumeFactory());
-    volumes.put(NFS, new NfsVolumeFactory());
-    volumes.put(AWS_EBS, new AwsEbsVolumeFactory());
+    volumes.put(VolumeType.EmptyDir, new EmptyDirVolumeFactory());
+    volumes.put(VolumeType.HostPath, new HostPathVolumeFactory());
+    volumes.put(VolumeType.NetworkFileSystem, new NetworkFileSystemVolumeFactory());
   }
 
   static Volumes get() {
     return new Volumes();
   }
 
-  V1Volume create(Config config) {
-    final String volumeType = KubernetesContext.getVolumeType(config);
+  /**
+   * Creates <code>Generic</code>, <code>Empty Directory</code>, <code>Host Path</code>, and
+   * <code>Network File System</code> volumes.
+   * @param volumeType One of the supported volume types.
+   * @param volumeName The name of the volume to generate.
+   * @param configs A map of configurations.
+   * @return Fully configured <code>V1Volume</code>.
+   */
+  V1Volume createVolume(VolumeType volumeType, String volumeName,
+                        Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
     if (volumes.containsKey(volumeType)) {
-      return volumes.get(volumeType).create(config);
+      return volumes.get(volumeType).create(volumeName, configs);
     }
     return null;
   }
 
-  interface VolumeFactory {
-    V1Volume create(Config config);
+  /**
+   * Generates a <code>Volume Mount</code> from specifications.
+   * @param volumeName Name of the <code>Volume</code>.
+   * @param configs Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
+   * @return A configured <code>V1VolumeMount</code>.
+   */
+  V1VolumeMount createMount(String volumeName,
+                            Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+    final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+        .withName(volumeName)
+        .build();
+    for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config : configs.entrySet()) {
+      switch (config.getKey()) {
+        case path:
+          volumeMount.mountPath(config.getValue());
+          break;
+        case subPath:
+          volumeMount.subPath(config.getValue());
+          break;
+        case readOnly:
+          volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+          break;
+        default:
+          break;
+      }
+    }
+    return volumeMount;
+  }
+
+  /**
+   * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
+   * to <code>key-value</code> pairs of configuration options and values.
+   * @param claimName Name to be assigned to <code>Persistent Volume Claims Template</code>.
+   * @param labels Labels to be attached to the <code>Persistent Volume Claims Template</code>.
+   * @param configs <code>Volume</code> to configuration <code>key-value</code> mappings.
+   * @return Fully populated dynamically backed <code>Persistent Volume Claims</code>.
+   */
+  V1PersistentVolumeClaim createPersistentVolumeClaim(String claimName, Map<String, String> labels,
+                                        Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+    V1PersistentVolumeClaim claim = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(claimName)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName("")
+        .endSpec()
+        .build();
+
+    // Populate PVC options.
+    for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> option : configs.entrySet()) {
+      String optionValue = option.getValue();
+      switch(option.getKey()) {
+        case storageClassName:
+          claim.getSpec().setStorageClassName(optionValue);
+          break;
+        case sizeLimit:
+          claim.getSpec().setResources(
+              new V1ResourceRequirements()
+                  .putRequestsItem("storage", new Quantity(optionValue)));
+          break;
+        case accessModes:
+          claim.getSpec().setAccessModes(Arrays.asList(optionValue.split(",")));
+          break;
+        case volumeMode:
+          claim.getSpec().setVolumeMode(optionValue);
+          break;
+        // Valid ignored options not used in a PVC.
+        default:
+          break;
+      }
+    }
+    return claim;
+  }
+
+  /**
+   * Generates a <code>Volume</code> with a <code>Persistent Volume Claim</code> inserted.
+   * @param claimName Name of the <code>Persistent Volume Claim</code>.
+   * @param volumeName Name of the <code>Volume</code> to place the <code>Persistent Volume Claim</code> in.
+   * @return Fully configured <code>Volume</code> with <code>Persistent Volume Claim</code> in it.
+   */
+  V1Volume createPersistentVolumeClaim(String claimName, String volumeName) {
+    return new V1VolumeBuilder()
+        .withName(volumeName)
+        .withNewPersistentVolumeClaim()
+          .withClaimName(claimName)
+        .endPersistentVolumeClaim()
+        .build();
   }
 
-  private static V1Volume newVolume(Config config) {
-    final String volumeName = KubernetesContext.getVolumeName(config);
-    return new V1Volume().name(volumeName);
+  interface IVolumeFactory {
+    V1Volume create(String volumeName, Map<KubernetesConstants.VolumeConfigKeys, String> configs);
   }
 
-  static class HostPathVolumeFactory implements VolumeFactory {
+  static class EmptyDirVolumeFactory implements IVolumeFactory {
+
+    /**
+     * Generates an <code>Empty Directory</code> <code>V1 Volume</code>.
+     * @param volumeName The name of the volume to generate.
+     * @param configs    A map of configurations.
+     * @return A fully configured <code>Empty Directory</code> volume.
+     */
     @Override
-    public V1Volume create(Config config) {
-      final V1Volume volume = newVolume(config);
+    public V1Volume create(String volumeName,
+                           Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+      final V1Volume volume = new V1VolumeBuilder()
+          .withName(volumeName)
+          .withNewEmptyDir()
+          .endEmptyDir()
+          .build();
 
-      final String path = KubernetesContext.getHostPathVolumePath(config);
-      final V1HostPathVolumeSource hostPathVolume =
-          new V1HostPathVolumeSource()
-              .path(path);
-      volume.hostPath(hostPathVolume);
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config : configs.entrySet()) {
+        switch(config.getKey()) {
+          case medium:
+            volume.getEmptyDir().medium(config.getValue());
+            break;
+          case sizeLimit:
+            volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+            break;
+          default:
+            break;
+        }
+      }
 
       return volume;
     }
   }
 
-  static class NfsVolumeFactory implements VolumeFactory {
-    @Override
-    public V1Volume create(Config config) {
-      final V1Volume volume = newVolume(config);
+  static class HostPathVolumeFactory implements IVolumeFactory {
 
-      final String path = KubernetesContext.getNfsVolumePath(config);
-      final String server = KubernetesContext.getNfsServer(config);
-      V1NFSVolumeSource nfsVolumeSource =
-          new V1NFSVolumeSource()
-              .path(path)
-              .server(server);
-      volume.setNfs(nfsVolumeSource);
+    /**
+     * Generates a <code>Host Path</code> <code>V1 Volume</code>.
+     * @param volumeName The name of the volume to generate.
+     * @param configs    A map of configurations.
+     * @return A fully configured <code>Host Path</code> volume.
+     */
+    @Override
+    public V1Volume create(String volumeName,
+                           Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+      final V1Volume volume = new V1VolumeBuilder()
+          .withName(volumeName)
+          .withNewHostPath()
+          .endHostPath()
+          .build();
 
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config : configs.entrySet()) {
+        switch (config.getKey()) {
+          case type:
+            volume.getHostPath().setType(config.getValue());
+            break;
+          case pathOnHost:
+            volume.getHostPath().setPath(config.getValue());
+            break;
+          default:
+            break;
+        }
+      }
       return volume;
     }
   }
 
-  static class AwsEbsVolumeFactory implements VolumeFactory {
-    @Override
-    public V1Volume create(Config config) {
-      final V1Volume volume = newVolume(config);
+  static class NetworkFileSystemVolumeFactory implements IVolumeFactory {
 
-      final String volumeId = KubernetesContext.getAwsEbsVolumeId(config);
-      final String fsType = KubernetesContext.getAwsEbsFsType(config);
-      V1AWSElasticBlockStoreVolumeSource awsElasticBlockStoreVolumeSource =
-          new V1AWSElasticBlockStoreVolumeSource()
-              .volumeID(volumeId)
-              .fsType(fsType);
-      volume.setAwsElasticBlockStore(awsElasticBlockStoreVolumeSource);
+    /**
+     * Generates a <code>Network File System</code> <code>V1 Volume</code>.
+     *
+     * @param volumeName The name of the volume to generate.
+     * @param configs    A map of configurations.
+     * @return A fully configured <code>Network File System</code> volume.
+     */
+    @Override
+    public V1Volume create(String volumeName,
+                           Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+      final V1Volume volume = new V1VolumeBuilder()
+          .withName(volumeName)
+          .withNewNfs()
+          .endNfs()
+          .build();
 
+      for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config : configs.entrySet()) {
+        switch (config.getKey()) {
+          case server:
+            volume.getNfs().setServer(config.getValue());
+            break;
+          case pathOnNFS:
+            volume.getNfs().setPath(config.getValue());
+            break;
+          case readOnly:
+            volume.getNfs().setReadOnly(Boolean.parseBoolean(config.getValue()));
+            break;
+          default:
+            break;
+        }
+      }
       return volume;
     }
   }
+
 }
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
index 162b4017896..a039ca980e5 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -678,71 +678,6 @@ public class V1ControllerTest {
     Assert.assertEquals("Container Resources are set from CLI.", expected, actual);
   }
 
-  @Test
-  public void testAddVolumesIfPresent() {
-    final String pathDefault = "config-host-volume-path";
-    final String pathNameDefault = "config-host-volume-name";
-    final Config configWithVolumes = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_VOLUME_NAME, pathNameDefault)
-        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, Volumes.HOST_PATH)
-        .put(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PATH, pathDefault)
-        .build();
-    final V1Controller controllerWithVol = new V1Controller(configWithVolumes, RUNTIME);
-
-    final V1Volume volumeDefault = new V1VolumeBuilder()
-        .withName(pathNameDefault)
-        .withNewHostPath()
-          .withNewPath(pathDefault)
-        .endHostPath()
-        .build();
-    final V1Volume volumeToBeKept = new V1VolumeBuilder()
-        .withName("volume-to-be-kept-name")
-        .withNewHostPath()
-          .withNewPath("volume-to-be-kept-path")
-        .endHostPath()
-        .build();
-
-    final List<V1Volume> customVolumeList = Arrays.asList(
-        new V1VolumeBuilder()
-            .withName(pathNameDefault)
-            .withNewHostPath()
-              .withNewPath("this-path-must-be-replaced")
-            .endHostPath()
-            .build(),
-        volumeToBeKept
-    );
-    final List<V1Volume> expectedDefault = Collections.singletonList(volumeDefault);
-    final List<V1Volume> expectedCustom = Arrays.asList(volumeDefault, volumeToBeKept);
-
-    // No Volumes set.
-    V1Controller controllerDoNotSetVolumes = new V1Controller(Config.newBuilder().build(), RUNTIME);
-    V1PodSpec podSpecNoSetVolumes = new V1PodSpec();
-    controllerDoNotSetVolumes.addVolumesIfPresent(podSpecNoSetVolumes);
-    Assert.assertNull(podSpecNoSetVolumes.getVolumes());
-
-    // Default. Null Volumes.
-    V1PodSpec podSpecNull = new V1PodSpecBuilder().build();
-    controllerWithVol.addVolumesIfPresent(podSpecNull);
-    Assert.assertTrue("Default VOLUMES should be set in container with null VOLUMES",
-        CollectionUtils.containsAll(expectedDefault, podSpecNull.getVolumes()));
-
-    // Empty Volumes list
-    V1PodSpec podSpecEmpty = new V1PodSpecBuilder()
-        .withVolumes(new LinkedList<>())
-        .build();
-    controllerWithVol.addVolumesIfPresent(podSpecEmpty);
-    Assert.assertTrue("Default VOLUMES should be set in container with empty VOLUMES",
-        CollectionUtils.containsAll(expectedDefault, podSpecEmpty.getVolumes()));
-
-    // Custom Volumes list
-    V1PodSpec podSpecCustom = new V1PodSpecBuilder()
-        .withVolumes(customVolumeList)
-        .build();
-    controllerWithVol.addVolumesIfPresent(podSpecCustom);
-    Assert.assertTrue("Default VOLUMES should be set in container with custom VOLUMES",
-        CollectionUtils.containsAll(expectedCustom, podSpecCustom.getVolumes()));
-  }
-
   @Test
   public void testMountVolumeIfPresent() {
     final String pathDefault = "config-host-volume-path";
@@ -936,6 +871,7 @@ public class V1ControllerTest {
     final List<V1PersistentVolumeClaim> actualClaims =
         v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
 
+    Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size());
     Assert.assertTrue(expectedClaims.containsAll(actualClaims));
   }
 
@@ -1216,103 +1152,6 @@ public class V1ControllerTest {
     }
   }
 
-  @Test
-  public void testCreateVolumeMountsCLI() {
-    final String volumeNamePVC = "volume-name-pvc";
-    final String volumeNameHostPath = "volume-name-host-path";
-    final String volumeNameEmptyDir = "volume-name-empty-dir";
-    final String volumeNameNFS = "volume-name-nfs";
-    final String value = "inserted-value";
-
-    // Test case container.
-    // Input: [0] volume name, [1] volume options
-    // Output: The expected <V1VolumeMount>.
-    final List<TestTuple<Pair<String, Map<VolumeConfigKeys, String>>, V1VolumeMount>> testCases =
-        new LinkedList<>();
-
-    // PVC.
-    final Map<VolumeConfigKeys, String> configPVC = ImmutableMap.<VolumeConfigKeys, String>builder()
-        .put(VolumeConfigKeys.claimName, value)
-        .put(VolumeConfigKeys.storageClassName, value)
-        .put(VolumeConfigKeys.sizeLimit, value)
-        .put(VolumeConfigKeys.accessModes, value)
-        .put(VolumeConfigKeys.volumeMode, value)
-        .put(VolumeConfigKeys.path, value)
-        .put(VolumeConfigKeys.subPath, value)
-        .put(VolumeConfigKeys.readOnly, "true")
-        .build();
-    final V1VolumeMount volumeMountPVC = new V1VolumeMountBuilder()
-        .withName(volumeNamePVC)
-        .withMountPath(value)
-        .withSubPath(value)
-        .withReadOnly(true)
-        .build();
-    testCases.add(new TestTuple<>("PVC volume mount",
-        new Pair<>(volumeNamePVC, configPVC), volumeMountPVC));
-
-    // Host Path.
-    final Map<VolumeConfigKeys, String> configHostPath =
-        ImmutableMap.<VolumeConfigKeys, String>builder()
-            .put(VolumeConfigKeys.type, "DirectoryOrCreate")
-            .put(VolumeConfigKeys.pathOnHost, value)
-            .put(VolumeConfigKeys.path, value)
-            .put(VolumeConfigKeys.subPath, value)
-            .put(VolumeConfigKeys.readOnly, "true")
-            .build();
-    final V1VolumeMount volumeMountHostPath = new V1VolumeMountBuilder()
-        .withName(volumeNameHostPath)
-        .withMountPath(value)
-        .withSubPath(value)
-        .withReadOnly(true)
-        .build();
-    testCases.add(new TestTuple<>("Host Path volume mount",
-        new Pair<>(volumeNameHostPath, configHostPath), volumeMountHostPath));
-
-    // Empty Dir.
-    final Map<VolumeConfigKeys, String> configEmptyDir =
-        ImmutableMap.<VolumeConfigKeys, String>builder()
-            .put(VolumeConfigKeys.sizeLimit, value)
-            .put(VolumeConfigKeys.medium, "Memory")
-            .put(VolumeConfigKeys.path, value)
-            .put(VolumeConfigKeys.subPath, value)
-            .put(VolumeConfigKeys.readOnly, "true")
-            .build();
-    final V1VolumeMount volumeMountEmptyDir = new V1VolumeMountBuilder()
-        .withName(volumeNameEmptyDir)
-        .withMountPath(value)
-        .withSubPath(value)
-        .withReadOnly(true)
-        .build();
-    testCases.add(new TestTuple<>("Empty Dir volume mount",
-        new Pair<>(volumeNameEmptyDir, configEmptyDir), volumeMountEmptyDir));
-
-    // NFS.
-    final Map<VolumeConfigKeys, String> configNFS = ImmutableMap.<VolumeConfigKeys, String>builder()
-        .put(VolumeConfigKeys.server, "nfs.server.address")
-        .put(VolumeConfigKeys.readOnly, "true")
-        .put(VolumeConfigKeys.pathOnNFS, value)
-        .put(VolumeConfigKeys.path, value)
-        .put(VolumeConfigKeys.subPath, value)
-        .build();
-    final V1VolumeMount volumeMountNFS = new V1VolumeMountBuilder()
-        .withName(volumeNameNFS)
-        .withMountPath(value)
-        .withSubPath(value)
-        .withReadOnly(true)
-        .build();
-    testCases.add(new TestTuple<>("NFS volume mount",
-        new Pair<>(volumeNameNFS, configNFS), volumeMountNFS));
-
-    // Test loop.
-    for (TestTuple<Pair<String, Map<VolumeConfigKeys, String>>, V1VolumeMount> testCase
-        : testCases) {
-      V1VolumeMount actual = v1ControllerPodTemplate.createVolumeMountsCLI(
-          testCase.input.first, testCase.input.second);
-      Assert.assertEquals(testCase.description, testCase.expected, actual);
-    }
-
-  }
-
   @Test
   public void testCreateVolumeAndMountsEmptyDirCLI() {
     final String volumeName = "volume-name-empty-dir";
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index 401c97f9f64..22d3088b598 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -19,67 +19,341 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.heron.spi.common.Config;
+import org.apache.heron.common.basics.Pair;
 
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
 import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
 public class VolumesTests {
 
   @Test
-  public void testNoVolume() {
-    final Config config = Config.newBuilder().build();
-    final V1Volume volume = Volumes.get().create(config);
-    Assert.assertNull(volume);
+  public void testEmptyDir() {
+    final String volumeName = "volume-name-empty-dir";
+    final String medium = "Memory";
+    final String sizeLimit = "1Gi";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+    final Map<KubernetesConstants.VolumeConfigKeys, String> config =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit)
+            .put(KubernetesConstants.VolumeConfigKeys.medium, medium)
+            .put(KubernetesConstants.VolumeConfigKeys.path, path)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, subPath)
+            .build();
+    final V1Volume expectedVolume = new V1VolumeBuilder()
+        .withName(volumeName)
+        .withNewEmptyDir()
+          .withMedium(medium)
+          .withNewSizeLimit(sizeLimit)
+        .endEmptyDir()
+        .build();
+
+    final V1Volume actualVolume = Volumes.get()
+        .createVolume(Volumes.VolumeType.EmptyDir, volumeName, config);
+
+    Assert.assertEquals("Volume Factory Empty Directory", expectedVolume, actualVolume);
+  }
+
+  @Test
+  public void testHostPath() {
+    final String volumeName = "volume-name-host-path";
+    final String type = "DirectoryOrCreate";
+    final String pathOnHost = "path.on.host";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+    final Map<KubernetesConstants.VolumeConfigKeys, String> config =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.type, type)
+            .put(KubernetesConstants.VolumeConfigKeys.pathOnHost, pathOnHost)
+            .put(KubernetesConstants.VolumeConfigKeys.path, path)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, subPath)
+            .build();
+    final V1Volume expectedVolume = new V1VolumeBuilder()
+        .withName(volumeName)
+        .withNewHostPath()
+          .withNewType(type)
+          .withNewPath(pathOnHost)
+        .endHostPath()
+        .build();
+
+    final V1Volume actualVolume = Volumes.get()
+        .createVolume(Volumes.VolumeType.HostPath, volumeName, config);
+
+    Assert.assertEquals("Volume Factory Host Path", expectedVolume, actualVolume);
+  }
+
+  @Test
+  public void testNetworkFileSystem() {
+    final String volumeName = "volume-name-nfs";
+    final String server = "nfs.server.address";
+    final String pathOnNFS = "path.on.host";
+    final String readOnly = "true";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+    final Map<KubernetesConstants.VolumeConfigKeys, String> config =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.server, server)
+            .put(KubernetesConstants.VolumeConfigKeys.readOnly, readOnly)
+            .put(KubernetesConstants.VolumeConfigKeys.pathOnNFS, pathOnNFS)
+            .put(KubernetesConstants.VolumeConfigKeys.path, path)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, subPath)
+            .build();
+    final V1Volume expectedVolume = new V1VolumeBuilder()
+        .withName(volumeName)
+        .withNewNfs()
+          .withServer(server)
+          .withPath(pathOnNFS)
+          .withReadOnly(Boolean.parseBoolean(readOnly))
+        .endNfs()
+        .build();
+
+    final V1Volume actualVolume = Volumes.get()
+        .createVolume(Volumes.VolumeType.NetworkFileSystem, volumeName, config);
+
+    Assert.assertEquals("Volume Factory Network File System", expectedVolume, actualVolume);
   }
 
   @Test
-  public void testHostPathVolume() {
-    final String path = "/test/dir1";
-    final Config config = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "hostPath")
-        .put(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PATH, path)
+  public void testVolumeMount() {
+    final String volumeNamePVC = "volume-name-pvc";
+    final String volumeNameHostPath = "volume-name-host-path";
+    final String volumeNameEmptyDir = "volume-name-empty-dir";
+    final String volumeNameNFS = "volume-name-nfs";
+    final String value = "inserted-value";
+
+    // Test case container.
+    // Input: [0] volume name, [1] volume options
+    // Output: The expected <V1VolumeMount>.
+    final List<KubernetesUtils.TestTuple<
+        Pair<String, Map<KubernetesConstants.VolumeConfigKeys, String>>, V1VolumeMount>> testCases =
+        new LinkedList<>();
+
+    // PVC.
+    final Map<KubernetesConstants.VolumeConfigKeys, String> configPVC =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.claimName, value)
+            .put(KubernetesConstants.VolumeConfigKeys.storageClassName, value)
+            .put(KubernetesConstants.VolumeConfigKeys.sizeLimit, value)
+            .put(KubernetesConstants.VolumeConfigKeys.accessModes, value)
+            .put(KubernetesConstants.VolumeConfigKeys.volumeMode, value)
+            .put(KubernetesConstants.VolumeConfigKeys.path, value)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, value)
+            .put(KubernetesConstants.VolumeConfigKeys.readOnly, "true")
+            .build();
+    final V1VolumeMount volumeMountPVC = new V1VolumeMountBuilder()
+        .withName(volumeNamePVC)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new KubernetesUtils.TestTuple<>("PVC volume mount",
+        new Pair<>(volumeNamePVC, configPVC), volumeMountPVC));
+
+    // Host Path.
+    final Map<KubernetesConstants.VolumeConfigKeys, String> configHostPath =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.type, "DirectoryOrCreate")
+            .put(KubernetesConstants.VolumeConfigKeys.pathOnHost, value)
+            .put(KubernetesConstants.VolumeConfigKeys.path, value)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, value)
+            .put(KubernetesConstants.VolumeConfigKeys.readOnly, "true")
+            .build();
+    final V1VolumeMount volumeMountHostPath = new V1VolumeMountBuilder()
+        .withName(volumeNameHostPath)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new KubernetesUtils.TestTuple<>("Host Path volume mount",
+        new Pair<>(volumeNameHostPath, configHostPath), volumeMountHostPath));
+
+    // Empty Dir.
+    final Map<KubernetesConstants.VolumeConfigKeys, String> configEmptyDir =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.sizeLimit, value)
+            .put(KubernetesConstants.VolumeConfigKeys.medium, "Memory")
+            .put(KubernetesConstants.VolumeConfigKeys.path, value)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, value)
+            .put(KubernetesConstants.VolumeConfigKeys.readOnly, "true")
+            .build();
+    final V1VolumeMount volumeMountEmptyDir = new V1VolumeMountBuilder()
+        .withName(volumeNameEmptyDir)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
         .build();
+    testCases.add(new KubernetesUtils.TestTuple<>("Empty Dir volume mount",
+        new Pair<>(volumeNameEmptyDir, configEmptyDir), volumeMountEmptyDir));
 
-    final V1Volume volume = Volumes.get().create(config);
-    Assert.assertNotNull(volume);
-    Assert.assertNotNull(volume.getHostPath());
-    Assert.assertEquals(volume.getHostPath().getPath(), path);
+    // NFS.
+    final Map<KubernetesConstants.VolumeConfigKeys, String> configNFS =
+        ImmutableMap.<KubernetesConstants.VolumeConfigKeys, String>builder()
+            .put(KubernetesConstants.VolumeConfigKeys.server, "nfs.server.address")
+            .put(KubernetesConstants.VolumeConfigKeys.readOnly, "true")
+            .put(KubernetesConstants.VolumeConfigKeys.pathOnNFS, value)
+            .put(KubernetesConstants.VolumeConfigKeys.path, value)
+            .put(KubernetesConstants.VolumeConfigKeys.subPath, value)
+            .build();
+    final V1VolumeMount volumeMountNFS = new V1VolumeMountBuilder()
+        .withName(volumeNameNFS)
+        .withMountPath(value)
+        .withSubPath(value)
+        .withReadOnly(true)
+        .build();
+    testCases.add(new KubernetesUtils.TestTuple<>("NFS volume mount",
+        new Pair<>(volumeNameNFS, configNFS), volumeMountNFS));
+
+    // Test loop.
+    for (KubernetesUtils.TestTuple<Pair<String, Map<KubernetesConstants.VolumeConfigKeys, String>>,
+             V1VolumeMount> testCase : testCases) {
+      V1VolumeMount actual = Volumes.get().createMount(testCase.input.first, testCase.input.second);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
   }
 
   @Test
-  public void testNfsVolume() {
-    final String path = "/test/dir1";
-    final String server = "10.10.10.10";
-    final Config config = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "nfs")
-        .put(KubernetesContext.KUBERNETES_VOLUME_NFS_PATH, path)
-        .put(KubernetesContext.KUBERNETES_VOLUME_NFS_SERVER, server)
+  public void testPersistentVolumeClaim() {
+    final String topologyName = "topology-name";
+    final String volumeNameOne = "volume-name-one";
+    final String volumeNameTwo = "volume-name-two";
+    final String volumeNameStatic = "volume-name-static";
+    final String claimNameOne = "OnDemand";
+    final String claimNameTwo = "claim-name-two";
+    final String claimNameStatic = "OnDEmaND";
+    final String storageClassName = "storage-class-name";
+    final String sizeLimit = "555Gi";
+    final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
+    final String accessModes = "ReadOnlyMany";
+    final String volumeMode = "VolumeMode";
+    final String path = "/path/to/mount/";
+    final String subPath = "/sub/path/to/mount/";
+    final Map<String, String> labels = V1Controller.getPersistentVolumeClaimLabels(topologyName);
+
+    final Map<KubernetesConstants.VolumeConfigKeys, String> volOneConfig =
+        new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {
+        {
+          put(KubernetesConstants.VolumeConfigKeys.claimName, claimNameOne);
+          put(KubernetesConstants.VolumeConfigKeys.storageClassName, storageClassName);
+          put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit);
+          put(KubernetesConstants.VolumeConfigKeys.accessModes, accessModesList);
+          put(KubernetesConstants.VolumeConfigKeys.volumeMode, volumeMode);
+          put(KubernetesConstants.VolumeConfigKeys.path, path);
+        }
+      };
+
+    final Map<KubernetesConstants.VolumeConfigKeys, String> volTwoConfig =
+        new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {
+          {
+            put(KubernetesConstants.VolumeConfigKeys.claimName, claimNameTwo);
+            put(KubernetesConstants.VolumeConfigKeys.storageClassName, storageClassName);
+            put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit);
+            put(KubernetesConstants.VolumeConfigKeys.accessModes, accessModesList);
+            put(KubernetesConstants.VolumeConfigKeys.volumeMode, volumeMode);
+            put(KubernetesConstants.VolumeConfigKeys.path, path);
+            put(KubernetesConstants.VolumeConfigKeys.subPath, subPath);
+          }
+        };
+
+    final Map<KubernetesConstants.VolumeConfigKeys, String> volStaticConfig =
+        new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {
+          {
+            put(KubernetesConstants.VolumeConfigKeys.claimName, claimNameStatic);
+            put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit);
+            put(KubernetesConstants.VolumeConfigKeys.accessModes, accessModes);
+            put(KubernetesConstants.VolumeConfigKeys.volumeMode, volumeMode);
+            put(KubernetesConstants.VolumeConfigKeys.path, path);
+            put(KubernetesConstants.VolumeConfigKeys.subPath, subPath);
+          }
+        };
+
+    final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameOne)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName(storageClassName)
+          .withAccessModes(Arrays.asList(accessModesList.split(",")))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
         .build();
 
-    final V1Volume volume = Volumes.get().create(config);
-    Assert.assertNotNull(volume);
-    Assert.assertNotNull(volume.getNfs());
-    Assert.assertEquals(volume.getNfs().getPath(), path);
-    Assert.assertEquals(volume.getNfs().getServer(), server);
+    final V1PersistentVolumeClaim claimTwo = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameTwo)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName(storageClassName)
+          .withAccessModes(Arrays.asList(accessModesList.split(",")))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameStatic)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName("")
+          .withAccessModes(Collections.singletonList(accessModes))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim actualPVCOne = Volumes.get()
+        .createPersistentVolumeClaim(volumeNameOne, labels, volOneConfig);
+    Assert.assertEquals("Volume one PVC", claimOne, actualPVCOne);
+
+    final V1PersistentVolumeClaim actualPVCTwo = Volumes.get()
+        .createPersistentVolumeClaim(volumeNameTwo, labels, volTwoConfig);
+    Assert.assertEquals("Volume two PVC", claimTwo, actualPVCTwo);
+
+    final V1PersistentVolumeClaim actualPVCStatic = Volumes.get()
+        .createPersistentVolumeClaim(volumeNameStatic, labels, volStaticConfig);
+    Assert.assertEquals("Volume static PVC", claimStatic, actualPVCStatic);
   }
 
   @Test
-  public void testAwsEbsVolume() {
-    final String volumeId = "aws-ebs-1";
-    final String fsType = "ext4";
-    final Config config = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, "awsElasticBlockStore")
-        .put(KubernetesContext.KUBERNETES_VOLUME_AWS_EBS_VOLUME_ID, volumeId)
-        .put(KubernetesContext.KUBERNETES_VOLUME_AWS_EBS_FS_TYPE, fsType)
+  public void testVolumeWithPersistentVolumeClaim() {
+    final String claimName = "claim-name";
+    final String volumeName = "volume-name";
+    final V1Volume expected = new V1VolumeBuilder()
+        .withName(volumeName)
+          .withNewPersistentVolumeClaim()
+        .withClaimName(claimName)
+        .endPersistentVolumeClaim()
         .build();
 
-    final V1Volume volume = Volumes.get().create(config);
-    Assert.assertNotNull(volume);
-    Assert.assertNotNull(volume.getAwsElasticBlockStore());
-    Assert.assertEquals(volume.getAwsElasticBlockStore().getVolumeID(), volumeId);
-    Assert.assertEquals(volume.getAwsElasticBlockStore().getFsType(), fsType);
+    final V1Volume actual = Volumes.get().createPersistentVolumeClaim(claimName, volumeName);
+
+    Assert.assertEquals("Volume with Persistent Volume Claim configured", expected, actual);
   }
 }