You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ni...@apache.org on 2021/07/07 14:06:49 UTC

[incubator-heron] 01/01: Added support for adding Kubernetes annotations to the topology pod

This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch nicknezis/k8s-annotations
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 531cf4c44af1e852ab3fb01c31b72fe8da823f4c
Author: Nicholas Nezis <ni...@gmail.com>
AuthorDate: Wed Jul 7 10:02:28 2021 -0400

    Added support for adding Kubernetes annotations to the topology pod
---
 .../scheduler/kubernetes/KubernetesContext.java    | 56 ++++++++++++-------
 .../heron/scheduler/kubernetes/V1Controller.java   | 62 +++++++++++++---------
 2 files changed, 76 insertions(+), 42 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index f938cd0..30d771b 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -1,20 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
  */
 
 package org.apache.heron.scheduler.kubernetes;
@@ -22,6 +22,9 @@ package org.apache.heron.scheduler.kubernetes;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public final class KubernetesContext extends Context {
   public static final String HERON_EXECUTOR_DOCKER_IMAGE = "heron.executor.docker.image";
 
@@ -44,7 +47,7 @@ public final class KubernetesContext extends Context {
      * provided in the Resource Limit. This mode effectively guarantees the
      * cpu and memory will be reserved.
      */
-    EQUAL_TO_LIMIT;
+    EQUAL_TO_LIMIT
   }
   /**
    * This config item is used to determine how to configure the K8s Resource Request.
@@ -83,6 +86,9 @@ public final class KubernetesContext extends Context {
   public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
       "heron.kubernetes.container.volumeMount.path";
 
+  public static final String HERON_KUBERNETES_CONTAINER_ANNOTATION =
+      "heron.kubernetes.container.annotation.";
+
   private KubernetesContext() {
   }
 
@@ -152,6 +158,20 @@ public final class KubernetesContext extends Context {
     return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
   }
 
+  static Set<String> getConfigKeys(Config config, String key) {
+    Set<String> annotations = new HashSet<>();
+    for (String s : config.getKeySet()) {
+      if (s.startsWith(key)) {
+        annotations.add(s);
+      }
+    }
+    return annotations;
+  }
+
+  static Set<String> getContainerAnnotationKeys(Config config) {
+    return getConfigKeys(config, HERON_KUBERNETES_CONTAINER_ANNOTATION);
+  }
+
   public static boolean hasContainerVolume(Config config) {
     final String name = getContainerVolumeName(config);
     final String path = getContainerVolumeMountPath(config);
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2056372..bc63948 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -1,20 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
  */
 
 package org.apache.heron.scheduler.kubernetes;
@@ -104,7 +104,7 @@ public class V1Controller extends KubernetesController {
 
     final Resource containerResource = getContainerResource(packingPlan);
 
-    final V1Service topologyService = createTopologyyService();
+    final V1Service topologyService = createTopologyService();
     try {
       final V1Service response =
           coreClient.createNamespacedService(getNamespace(), topologyService, null,
@@ -163,7 +163,7 @@ public class V1Controller extends KubernetesController {
     final int newContainerCount = currentContainerCount + containersToAdd.size();
 
     try {
-      patchStatefulsetReplicas(newContainerCount);
+      patchStatefulSetReplicas(newContainerCount);
     } catch (ApiException ae) {
       throw new TopologyRuntimeManagementException(
           ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
@@ -185,14 +185,14 @@ public class V1Controller extends KubernetesController {
     final int newContainerCount = currentContainerCount - containersToRemove.size();
 
     try {
-      patchStatefulsetReplicas(newContainerCount);
+      patchStatefulSetReplicas(newContainerCount);
     } catch (ApiException e) {
       throw new TopologyRuntimeManagementException(
           e.getMessage() + "\ndetails\n" + e.getResponseBody());
     }
   }
 
-  private void patchStatefulsetReplicas(int replicas) throws ApiException {
+  private void patchStatefulSetReplicas(int replicas) throws ApiException {
     final String body =
             String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
                     replicas);
@@ -317,7 +317,7 @@ public class V1Controller extends KubernetesController {
     return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
   }
 
-  private V1Service createTopologyyService() {
+  private V1Service createTopologyService() {
     final String topologyName = getTopologyName();
     final Config runtimeConfiguration = getRuntimeConfiguration();
 
@@ -370,7 +370,10 @@ public class V1Controller extends KubernetesController {
 
     // set up pod meta
     final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName));
-    templateMetaData.annotations(getPrometheusAnnotations());
+    Map<String, String> annotations = new HashMap<>();
+    annotations.putAll(getPodAnnotations());
+    annotations.putAll((getPrometheusAnnotations()));
+    templateMetaData.annotations(annotations);
     podTemplateSpec.setMetadata(templateMetaData);
 
     final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID);
@@ -383,6 +386,17 @@ public class V1Controller extends KubernetesController {
     return statefulSet;
   }
 
+  private Map<String, String> getPodAnnotations() {
+    Config config = getConfiguration();
+    final Map<String, String> annotations = new HashMap<>();
+    final Set<String> keys = KubernetesContext.getContainerAnnotationKeys(config);
+    for (String s : keys) {
+      String value = config.getStringValue(s);
+      annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_CONTAINER_ANNOTATION, ""), value);
+    }
+    return annotations;
+  }
+
   private Map<String, String> getPrometheusAnnotations() {
     final Map<String, String> annotations = new HashMap<>();
     annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
@@ -529,7 +543,7 @@ public class V1Controller extends KubernetesController {
     if (remoteDebugEnabled) {
       IntStream.range(0, numberOfInstances).forEach(i -> {
         final String portName =
-            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + String.valueOf(i);
+            KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
         final V1ContainerPort port = new V1ContainerPort();
         port.setName(portName);
         port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);