You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/05/02 19:22:53 UTC

[incubator-heron] 01/01: [K8s] created PVC generation in Volume Factory.

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3821-Remove-Deprecated-Volumes-K8s-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 35c023e106186fe972a82f6ce2c4d7bc9e35a409
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Mon May 2 15:22:06 2022 -0400

    [K8s] created PVC generation in Volume Factory.
---
 .../apache/heron/scheduler/kubernetes/Volumes.java |  52 ++++++++-
 .../heron/scheduler/kubernetes/VolumesTests.java   | 121 ++++++++++++++++++++-
 2 files changed, 170 insertions(+), 3 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
index 39e9a658d5d..d9e641f78cc 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java
@@ -19,11 +19,14 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1Volume;
 import io.kubernetes.client.openapi.models.V1VolumeBuilder;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
@@ -34,8 +37,7 @@ final class Volumes {
   public enum VolumeType {
     EmptyDir,
     HostPath,
-    NetworkFileSystem,
-    PersistentVolumeClaim
+    NetworkFileSystem
   }
   private final Map<VolumeType, IVolumeFactory> volumes = new HashMap<>();
 
@@ -94,6 +96,52 @@ final class Volumes {
     return volumeMount;
   }
 
+  /**
+   * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
+   * to <code>key-value</code> pairs of configuration options and values.
+   * @param claimName Name to be assigned to <code>Persistent Volume Claims Template</code>.
+   * @param labels Labels to be attached to the <code>Persistent Volume Claims Template</code>.
+   * @param configs <code>Volume</code> to configuration <code>key-value</code> mappings.
+   * @return Fully populated dynamically backed <code>Persistent Volume Claims</code>.
+   */
+  V1PersistentVolumeClaim createPersistentVolumeClaim(String claimName, Map<String, String> labels,
+                                        Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+    V1PersistentVolumeClaim claim = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(claimName)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName("")
+        .endSpec()
+        .build();
+
+    // Populate PVC options.
+    for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> option : configs.entrySet()) {
+      String optionValue = option.getValue();
+      switch(option.getKey()) {
+        case storageClassName:
+          claim.getSpec().setStorageClassName(optionValue);
+          break;
+        case sizeLimit:
+          claim.getSpec().setResources(
+              new V1ResourceRequirements()
+                  .putRequestsItem("storage", new Quantity(optionValue)));
+          break;
+        case accessModes:
+          claim.getSpec().setAccessModes(Arrays.asList(optionValue.split(",")));
+          break;
+        case volumeMode:
+          claim.getSpec().setVolumeMode(optionValue);
+          break;
+        // Valid ignored options not used in a PVC.
+        default:
+          break;
+      }
+    }
+    return claim;
+  }
+
   interface IVolumeFactory {
     V1Volume create(String volumeName, Map<KubernetesConstants.VolumeConfigKeys, String> configs);
   }
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index a4b8c72be02..1cd7889741b 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -19,6 +19,9 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +33,9 @@ import org.junit.Test;
 
 import org.apache.heron.common.basics.Pair;
 
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
 import io.kubernetes.client.openapi.models.V1Volume;
 import io.kubernetes.client.openapi.models.V1VolumeBuilder;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
@@ -216,9 +222,122 @@ public class VolumesTests {
 
     // Test loop.
     for (KubernetesUtils.TestTuple<Pair<String, Map<KubernetesConstants.VolumeConfigKeys, String>>,
-        V1VolumeMount> testCase : testCases) {
+          V1VolumeMount> testCase : testCases) {
       V1VolumeMount actual = Volumes.get().createMount(testCase.input.first, testCase.input.second);
       Assert.assertEquals(testCase.description, testCase.expected, actual);
     }
   }
+
+  @Test
+  public void testPersistentVolumeClaim() {
+    final String topologyName = "topology-name";
+    final String volumeNameOne = "volume-name-one";
+    final String volumeNameTwo = "volume-name-two";
+    final String volumeNameStatic = "volume-name-static";
+    final String claimNameOne = "OnDemand";
+    final String claimNameTwo = "claim-name-two";
+    final String claimNameStatic = "OnDEmaND";
+    final String storageClassName = "storage-class-name";
+    final String sizeLimit = "555Gi";
+    final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
+    final String accessModes = "ReadOnlyMany";
+    final String volumeMode = "VolumeMode";
+    final String path = "/path/to/mount/";
+    final String subPath = "/sub/path/to/mount/";
+    final Map<String, String> labels = V1Controller.getPersistentVolumeClaimLabels(topologyName);
+
+    final Map<KubernetesConstants.VolumeConfigKeys, String> volOneConfig =
+        new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {
+        {
+          put(KubernetesConstants.VolumeConfigKeys.claimName, claimNameOne);
+          put(KubernetesConstants.VolumeConfigKeys.storageClassName, storageClassName);
+          put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit);
+          put(KubernetesConstants.VolumeConfigKeys.accessModes, accessModesList);
+          put(KubernetesConstants.VolumeConfigKeys.volumeMode, volumeMode);
+          put(KubernetesConstants.VolumeConfigKeys.path, path);
+        }
+      };
+
+    final Map<KubernetesConstants.VolumeConfigKeys, String> volTwoConfig =
+        new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {
+          {
+            put(KubernetesConstants.VolumeConfigKeys.claimName, claimNameTwo);
+            put(KubernetesConstants.VolumeConfigKeys.storageClassName, storageClassName);
+            put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit);
+            put(KubernetesConstants.VolumeConfigKeys.accessModes, accessModesList);
+            put(KubernetesConstants.VolumeConfigKeys.volumeMode, volumeMode);
+            put(KubernetesConstants.VolumeConfigKeys.path, path);
+            put(KubernetesConstants.VolumeConfigKeys.subPath, subPath);
+          }
+        };
+
+    final Map<KubernetesConstants.VolumeConfigKeys, String> volStaticConfig =
+        new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {
+          {
+            put(KubernetesConstants.VolumeConfigKeys.claimName, claimNameStatic);
+            put(KubernetesConstants.VolumeConfigKeys.sizeLimit, sizeLimit);
+            put(KubernetesConstants.VolumeConfigKeys.accessModes, accessModes);
+            put(KubernetesConstants.VolumeConfigKeys.volumeMode, volumeMode);
+            put(KubernetesConstants.VolumeConfigKeys.path, path);
+            put(KubernetesConstants.VolumeConfigKeys.subPath, subPath);
+          }
+        };
+
+    final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameOne)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName(storageClassName)
+          .withAccessModes(Arrays.asList(accessModesList.split(",")))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim claimTwo = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameTwo)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName(storageClassName)
+          .withAccessModes(Arrays.asList(accessModesList.split(",")))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameStatic)
+          .withLabels(labels)
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName("")
+          .withAccessModes(Collections.singletonList(accessModes))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim actualPVCOne = Volumes.get()
+        .createPersistentVolumeClaim(volumeNameOne, labels, volOneConfig);
+    Assert.assertEquals("Volume one PVC", claimOne, actualPVCOne);
+
+    final V1PersistentVolumeClaim actualPVCTwo = Volumes.get()
+        .createPersistentVolumeClaim(volumeNameTwo, labels, volTwoConfig);
+    Assert.assertEquals("Volume two PVC", claimTwo, actualPVCTwo);
+
+    final V1PersistentVolumeClaim actualPVCStatic = Volumes.get()
+        .createPersistentVolumeClaim(volumeNameStatic, labels, volStaticConfig);
+    Assert.assertEquals("Volume static PVC", claimStatic, actualPVCStatic);
+  }
 }