You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "nlippis (via GitHub)" <gi...@apache.org> on 2023/03/08 00:25:33 UTC

[GitHub] [druid] nlippis opened a new pull request, #13896: Pod template task adapter

nlippis opened a new pull request, #13896:
URL: https://github.com/apache/druid/pull/13896

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Kubernetes pod template task adapter for the Kubernetes task runner
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   **Purpose**
   The existing Kubernetes task runner get's its pod spec by copying the pod spec of the master pod.  While the existing implementation offers some configurability, it is not enough for some users who will need to run Peons with a completely different pod spec from the master node.
   
   **Changes**
   * Allow users to specify a Kubernetes pod spec per task type.
   * Move PeonCommandContext creation to TaskAdapter since some implementation will not need it
   
   **Note**
   This PR only introduces the `PodTemplateTaskAdapter` a forthcoming PR will wire it up in `KubernetesTaskRunnerFactory`
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `PodTemplateTaskAdapter`
    * `K8sTaskAdapter`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] churromorales commented on a diff in pull request #13896: Pod template task adapter

Posted by "churromorales (via GitHub)" <gi...@apache.org>.
churromorales commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1129958458


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -313,61 +288,6 @@ public Map<String, Long> getTotalTaskSlotCount()
     return ImmutableMap.of("taskQueue", (long) taskQueueConfig.getMaxSize());
   }
 
-  private List<String> javaOpts(Task task)
-  {
-    final List<String> javaOpts = new ArrayList<>();
-    Iterables.addAll(javaOpts, k8sConfig.javaOptsArray);
-
-    // Override task specific javaOpts
-    Object taskJavaOpts = task.getContextValue(
-        ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
-    );
-    if (taskJavaOpts != null) {
-      Iterables.addAll(
-          javaOpts,
-          new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
-      );
-    }
-
-    javaOpts.add(StringUtils.format("-Ddruid.port=%d", DruidK8sConstants.PORT));
-    javaOpts.add(StringUtils.format("-Ddruid.plaintextPort=%d", DruidK8sConstants.PORT));
-    javaOpts.add(StringUtils.format("-Ddruid.tlsPort=%d", node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
-    javaOpts.add(StringUtils.format(
-        "-Ddruid.task.executor.tlsPort=%d",
-        node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
-    ));
-    javaOpts.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
-    );
-    return javaOpts;
-  }
-
-  private List<String> generateCommand(Task task)
-  {
-    final List<String> command = new ArrayList<>();
-    command.add("/peon.sh");
-    command.add(taskConfig.getBaseTaskDirPaths().get(0));
-    command.add(task.getId());
-    command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.
-
-    String nodeType = task.getNodeType();
-    if (nodeType != null) {
-      command.add("--nodeType");
-      command.add(nodeType);
-    }
-
-    // If the task type is queryable, we need to load broadcast segments on the peon, used for
-    // join queries
-    if (task.supportsQueries()) {
-      command.add("--loadBroadcastSegments");
-      command.add("true");
-    }
-    log.info(
-        "Peon Command for K8s job: %s",
-        ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
-    );
-    return command;
-  }
-

Review Comment:
   can you explain how the CliPeon is going to work with this approach?  How is AbstractTask going to look as well?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] nlippis commented on a diff in pull request #13896: Pod template task adapter

Posted by "nlippis (via GitHub)" <gi...@apache.org>.
nlippis commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1143276598


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java:
##########
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;

Review Comment:
   I'm unfamiliar with the api that Fargate provides.  If it does offer a k8s api then yes we could continue to use the KubernetesTaskRunner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #13896: Pod template task adapter

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1131520570


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java:
##########
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;

Review Comment:
   i think there was some idea that we would want to leave open the option of non K8s Task Runners (e.g. fargate) which would imply the need for non K8s TaskAdapters, is there a need to override this class? wouldn't K8sTaskAdapter implements TaskAdapter<Job, Pod> still work here?



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  @Override
+  public Job fromTask(Task task) throws IOException
+  {
+    PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
+    if (podTemplate == null) {
+      throw new ISE("Pod template spec not found for task type [%s]", task.getType());
+    }
+
+    return new JobBuilder()
+        .withNewMetadata()
+        .withName(new K8sTaskId(task).getK8sTaskId())
+        .addToLabels(getJobLabels(taskRunnerConfig))
+        .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+        .endMetadata()
+        .withNewSpec()
+        .withTemplate(podTemplate.getTemplate())
+        .editTemplate()
+        .editOrNewMetadata()
+        .addToAnnotations(getPodTemplateAnnotations(task))
+        .addToLabels(getPodLabels(taskRunnerConfig))
+        .endMetadata()
+        .editSpec()
+        .editFirstContainer()
+        .addAllToEnv(getEnv())
+        .endContainer()
+        .endSpec()
+        .endTemplate()
+        .withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
+        .withBackoffLimit(0)
+        .withTtlSecondsAfterFinished((int) taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
+        .endSpec()
+        .build();
+  }
+
+  @Override
+  public Task toTask(Pod from) throws IOException
+  {
+    Map<String, String> annotations = from.getMetadata().getAnnotations();
+    if (annotations == null) {
+      throw new IOE("No annotations found on pod [%s]", from.getMetadata().getName());
+    }
+    String task = annotations.get(DruidK8sConstants.TASK);
+    if (task == null) {
+      throw new IOE("No task annotation found on pod [%s]", from.getMetadata().getName());
+    }
+    return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
+  }
+
+  private HashMap<String, PodTemplate> initializePodTemplates(Properties properties)
+  {
+    HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
+    Optional<PodTemplate> basePodTemplate = loadPodTemplate("base", properties);
+    if (!basePodTemplate.isPresent()) {
+      throw new IAE("Pod template task adapter requires a base pod template to be specified");
+    }
+    podTemplateMap.put("base", basePodTemplate.get());
+
+    MapperConfig config = mapper.getDeserializationConfig();
+    AnnotatedClass cls = AnnotatedClassResolver.resolveWithoutSuperTypes(config, Task.class);
+    Collection<NamedType> taskSubtypes = mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, cls);
+    for (NamedType namedType : taskSubtypes) {

Review Comment:
   is there a use case that we'd want multiple specs for a single task type? like e.g. a bigger index_kafka job vs a smaller index_kafka job? I don't think that needs to be supported in this PR but it would be good to still have the option to implement something like that in the future.
   
   I guess with this implementation you could just set druid.indexer.runner.k8s.podTemplate.index_parallel_custom or something, and assuming there's some field on the task (customJobTemplate or similar) that specifies to use this template, to reference that field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] churromorales commented on pull request #13896: Pod template task adapter

Posted by "churromorales (via GitHub)" <gi...@apache.org>.
churromorales commented on PR #13896:
URL: https://github.com/apache/druid/pull/13896#issuecomment-1460765361

   Could you update the README for this extension on how to use this?  That would help a lot in reviewing this.  I am a bit unclear how this is initialized, what properties need to be set, vs configurations, etc...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #13896: Pod template task adapter

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1131652703


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java:
##########
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;

Review Comment:
   Realizing TaskAdapter is actually a K8s only concept so this comment is not relevant.
   
   I do think TaskAdapter being only for k8s to be a little confusing though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13896: Pod template task adapter

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1140111190


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter

Review Comment:
   Please add javadocs on this class. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerFactoryTest
+{
+  private ObjectMapper objectMapper;
+  private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+  private StartupLoggingConfig startupLoggingConfig;
+  private TaskQueueConfig taskQueueConfig;
+  private TaskLogPusher taskLogPusher;
+  private DruidNode druidNode;
+  private TaskConfig taskConfig;
+
+  @Before
+  public void setup()
+  {
+    objectMapper = new TestUtils().getTestObjectMapper();
+    kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskQueueConfig = new TaskQueueConfig(
+        1,
+        null,
+        null,
+        null
+    );
+    taskLogPusher = new NoopTaskLogs();
+    druidNode = new DruidNode(
+        "test",
+        "",
+        false,
+        0,
+        1,
+        true,
+        false
+    );
+    taskConfig = new TaskConfig(
+        "/tmp",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("/tmp")
+    );
+  }
+
+  @Test
+  public void test_get_returnsSameKuberentesTaskRunner_asBuild()
+  {
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner expectedRunner = factory.build();
+    KubernetesTaskRunner actualRunner = factory.get();
+
+    Assert.assertEquals(expectedRunner, actualRunner);
+  }
+
+  @Test
+  public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSingleContainerTaskAdapter()
+  {
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+    Assert.assertNotNull(runner);
+  }
+
+  @Test
+  public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
+  {
+    kubernetesTaskRunnerConfig.sidecarSupport = true;
+
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+
+    Assert.assertNotNull(runner);

Review Comment:
   you are verifying that the runner is not null. but it doesn't verify that the runner has a multi-container task adapter. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerFactoryTest
+{
+  private ObjectMapper objectMapper;
+  private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
+  private StartupLoggingConfig startupLoggingConfig;
+  private TaskQueueConfig taskQueueConfig;
+  private TaskLogPusher taskLogPusher;
+  private DruidNode druidNode;
+  private TaskConfig taskConfig;
+
+  @Before
+  public void setup()
+  {
+    objectMapper = new TestUtils().getTestObjectMapper();
+    kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
+    startupLoggingConfig = new StartupLoggingConfig();
+    taskQueueConfig = new TaskQueueConfig(
+        1,
+        null,
+        null,
+        null
+    );
+    taskLogPusher = new NoopTaskLogs();
+    druidNode = new DruidNode(
+        "test",
+        "",
+        false,
+        0,
+        1,
+        true,
+        false
+    );
+    taskConfig = new TaskConfig(
+        "/tmp",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("/tmp")
+    );
+  }
+
+  @Test
+  public void test_get_returnsSameKuberentesTaskRunner_asBuild()
+  {
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner expectedRunner = factory.build();
+    KubernetesTaskRunner actualRunner = factory.get();
+
+    Assert.assertEquals(expectedRunner, actualRunner);
+  }
+
+  @Test
+  public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSingleContainerTaskAdapter()
+  {
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+    Assert.assertNotNull(runner);
+  }
+
+  @Test
+  public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
+  {
+    kubernetesTaskRunnerConfig.sidecarSupport = true;
+
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+
+    Assert.assertNotNull(runner);
+  }
+
+  @Test
+  public void test_build_withClientProxyDisabled_returnsKubernetesTaskRunnerWithDruidKubernetesClientWithoutClientProxySupport()
+  {
+    kubernetesTaskRunnerConfig.disableClientProxy = true;
+
+    KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
+        objectMapper,
+        kubernetesTaskRunnerConfig,
+        startupLoggingConfig,
+        taskQueueConfig,
+        taskLogPusher,
+        druidNode,
+        taskConfig
+    );
+
+    KubernetesTaskRunner runner = factory.build();
+
+    Assert.assertNotNull(runner);

Review Comment:
   same comment as above here. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java:
##########


Review Comment:
   Please do add error message verification as well. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java:
##########
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;

Review Comment:
   why would Fargate be implemented through a different runner? doesn't fargate offer k8s api? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";

Review Comment:
   are you going to document this in a follow up PR? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  @Override
+  public Job fromTask(Task task) throws IOException

Review Comment:
   This method can use some documentation for future readers E.g. why is the backoff limit set to 0? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Properties;
+
+@EnableKubernetesMockClient()
+public class PodTemplateTaskAdapterTest
+{
+  @TempDir private Path tempDir;
+  private KubernetesClient client;
+  private KubernetesTaskRunnerConfig taskRunnerConfig;
+  private TestKubernetesClient testClient;
+  private PodTemplate podTemplateSpec;
+  private TaskConfig taskConfig;
+  private DruidNode node;
+  private ObjectMapper mapper;
+
+  @BeforeEach
+  public void setup()
+  {
+    taskRunnerConfig = new KubernetesTaskRunnerConfig();
+    testClient = new TestKubernetesClient(client);
+    taskConfig = new TaskConfig(
+        "/tmp",
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        false,
+        false,
+        null,
+        null,
+        false,
+        ImmutableList.of("/tmp")
+    );
+    node = new DruidNode(
+        "test",
+        "",
+        false,
+        0,
+        1,
+        true,
+        false
+    );
+    mapper = new TestUtils().getTestObjectMapper();
+    podTemplateSpec = client
+        .v1()
+        .podTemplates()
+        .load(this.getClass()
+            .getClassLoader()
+            .getResourceAsStream("basePodTemplate.yaml")
+        )
+        .get();
+  }
+
+  @Test
+  public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
+  {
+    Assert.assertThrows(IAE.class, () -> new PodTemplateTaskAdapter(

Review Comment:
   Please also verify that the correct error message is included in the exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] nlippis commented on pull request #13896: Pod template task adapter

Posted by "nlippis (via GitHub)" <gi...@apache.org>.
nlippis commented on PR #13896:
URL: https://github.com/apache/druid/pull/13896#issuecomment-1460813233

   > Could you update the README for this extension on how to use this? That would help a lot in reviewing this. I am a bit unclear how this is initialized, what properties need to be set, vs configurations, etc...
   
   This PR just introduces the `PodTemplateTaskAdapter`, a following PR will wire it up.  I will include public facing documentation about how to use it there.
   
   That said, the user will specify `druid.indexer.runner.k8s.adapter.type={SingleContainer|MultiContainer|PodTemplate}`.  The user can then specify pod templates per task by specifying the paths to each file i.e `druid.indexer.runner.k8s.podTemplate.{task_type}=/path/to/task.yml`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] nlippis commented on a diff in pull request #13896: Pod template task adapter

Posted by "nlippis (via GitHub)" <gi...@apache.org>.
nlippis commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1143277808


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";

Review Comment:
   Yes, these will be documented in the next PR that hooks the PodTemplateTaskAdapter up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] dclim merged pull request #13896: Pod template task adapter

Posted by "dclim (via GitHub)" <gi...@apache.org>.
dclim merged PR #13896:
URL: https://github.com/apache/druid/pull/13896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] nlippis commented on a diff in pull request #13896: Pod template task adapter

Posted by "nlippis (via GitHub)" <gi...@apache.org>.
nlippis commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1131722506


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java:
##########
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;

Review Comment:
   We would implement Fargate within a FargateTaskRunner.  The `TaskAdapter` concept is a KubernetesTaskRunner only concept not a Druid one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] churromorales commented on a diff in pull request #13896: Pod template task adapter

Posted by "churromorales (via GitHub)" <gi...@apache.org>.
churromorales commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1129952031


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  @Override
+  public Job fromTask(Task task) throws IOException
+  {
+    PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
+    if (podTemplate == null) {
+      throw new ISE("Pod template spec not found for task type [%s]", task.getType());
+    }
+
+    return new JobBuilder()
+        .withNewMetadata()
+        .withName(new K8sTaskId(task).getK8sTaskId())
+        .addToLabels(getJobLabels(taskRunnerConfig))
+        .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+        .endMetadata()
+        .withNewSpec()
+        .withTemplate(podTemplate.getTemplate())

Review Comment:
   will this work with sidecars injected by a controller?  Something like istio for example where you have a control plane which injects a sidecar container into your template.  I think what would happen is your job would run and then never really complete....because the istio container would never shut down, you would only exit the main container.  I don't know an easy way to do this unless you can do some more templating in yaml, but maybe at the very least I would call this out: ```If you have a service mesh, which injects sidecars (like istio) this approach may not work. ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] nlippis commented on a diff in pull request #13896: Pod template task adapter

Posted by "nlippis (via GitHub)" <gi...@apache.org>.
nlippis commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1129978343


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  @Override
+  public Job fromTask(Task task) throws IOException
+  {
+    PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
+    if (podTemplate == null) {
+      throw new ISE("Pod template spec not found for task type [%s]", task.getType());
+    }
+
+    return new JobBuilder()
+        .withNewMetadata()
+        .withName(new K8sTaskId(task).getK8sTaskId())
+        .addToLabels(getJobLabels(taskRunnerConfig))
+        .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+        .endMetadata()
+        .withNewSpec()
+        .withTemplate(podTemplate.getTemplate())

Review Comment:
   The user can write their pod spec in any way that they like in order to conform to the peculiarities of their Kubernetes infra.   As @georgew5656 mentioned there are a few ways to handle sidecars, from our perspective Druid need not be opinionated about how one would handle starting, stopping and configuring them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #13896: Pod template task adapter

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1129954000


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  @Override
+  public Job fromTask(Task task) throws IOException
+  {
+    PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
+    if (podTemplate == null) {
+      throw new ISE("Pod template spec not found for task type [%s]", task.getType());
+    }
+
+    return new JobBuilder()
+        .withNewMetadata()
+        .withName(new K8sTaskId(task).getK8sTaskId())
+        .addToLabels(getJobLabels(taskRunnerConfig))
+        .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+        .endMetadata()
+        .withNewSpec()
+        .withTemplate(podTemplate.getTemplate())

Review Comment:
   One thing you can do (since the template overrides the run command) is include something after the main run command that kills the istio pod.
   
   e.g.
   java ... CliPeon...
   exit_code=$(echo $?); curl -fsI -X POST http://localhost:15020/quitquitquit && exit $exit_code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] nlippis commented on a diff in pull request #13896: Pod template task adapter

Posted by "nlippis (via GitHub)" <gi...@apache.org>.
nlippis commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1130054888


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -313,61 +288,6 @@ public Map<String, Long> getTotalTaskSlotCount()
     return ImmutableMap.of("taskQueue", (long) taskQueueConfig.getMaxSize());
   }
 
-  private List<String> javaOpts(Task task)
-  {
-    final List<String> javaOpts = new ArrayList<>();
-    Iterables.addAll(javaOpts, k8sConfig.javaOptsArray);
-
-    // Override task specific javaOpts
-    Object taskJavaOpts = task.getContextValue(
-        ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
-    );
-    if (taskJavaOpts != null) {
-      Iterables.addAll(
-          javaOpts,
-          new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
-      );
-    }
-
-    javaOpts.add(StringUtils.format("-Ddruid.port=%d", DruidK8sConstants.PORT));
-    javaOpts.add(StringUtils.format("-Ddruid.plaintextPort=%d", DruidK8sConstants.PORT));
-    javaOpts.add(StringUtils.format("-Ddruid.tlsPort=%d", node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
-    javaOpts.add(StringUtils.format(
-        "-Ddruid.task.executor.tlsPort=%d",
-        node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
-    ));
-    javaOpts.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
-    );
-    return javaOpts;
-  }
-
-  private List<String> generateCommand(Task task)
-  {
-    final List<String> command = new ArrayList<>();
-    command.add("/peon.sh");
-    command.add(taskConfig.getBaseTaskDirPaths().get(0));
-    command.add(task.getId());
-    command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.
-
-    String nodeType = task.getNodeType();
-    if (nodeType != null) {
-      command.add("--nodeType");
-      command.add(nodeType);
-    }
-
-    // If the task type is queryable, we need to load broadcast segments on the peon, used for
-    // join queries
-    if (task.supportsQueries()) {
-      command.add("--loadBroadcastSegments");
-      command.add("true");
-    }
-    log.info(
-        "Peon Command for K8s job: %s",
-        ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
-    );
-    return command;
-  }
-

Review Comment:
   No changes are required to CliPeon or AbstractTask.  With the `PodTemplateTaskAdapter` everything is up to the user to specify.  Our plan moving forward will be for sane defaults to be set within helm but those changes will be made in a future PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #13896: Pod template task adapter

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1131652703


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java:
##########
@@ -19,15 +19,17 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexing.common.task.Task;
 
 import java.io.IOException;
 
-public interface TaskAdapter<K, V>
+public interface TaskAdapter
 {
 
-  V fromTask(Task task, PeonCommandContext context) throws IOException;

Review Comment:
   Realizing TaskAdapter is actually a K8s only concept so this comment is not relevant.
   
   I do think TaskAdapter being only for k8s to be a little confusing though, i think it would make more sense if the current TaskAdapter was called K8sTaskAdapter and the current K8sTaskAdapter was called K8sFromOverlordTaskAdapter or something similar



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #13896: Pod template task adapter

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #13896:
URL: https://github.com/apache/druid/pull/13896#discussion_r1128841861


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java:
##########
@@ -304,4 +327,57 @@
     }
   }
 
+  private List<String> javaOpts(Task task)
+  {
+    final List<String> javaOpts = new ArrayList<>();
+    Iterables.addAll(javaOpts, taskRunnerConfig.javaOptsArray);
+
+    // Override task specific javaOpts
+    Object taskJavaOpts = task.getContextValue(
+        ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+    );
+    if (taskJavaOpts != null) {
+      Iterables.addAll(
+          javaOpts,
+          new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
+      );
+    }
+
+    javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.port=%d", DruidK8sConstants.PORT));
+    javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.plaintextPort=%d", DruidK8sConstants.PORT));
+    javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.tlsPort=%d", node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
+    javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
+        "-Ddruid.task.executor.tlsPort=%d",
+        node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
+    ));
+    javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
+    );
+    return javaOpts;
+  }
+
+  private List<String> generateCommand(Task task)
+  {
+    final List<String> command = new ArrayList<>();
+    command.add("/peon.sh");
+    command.add(new File(taskConfig.getBaseTaskDirPath()).toString());

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [TaskConfig.getBaseTaskDirPath](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4381)



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
+import org.apache.druid.server.DruidNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PodTemplateTaskAdapter implements TaskAdapter
+{
+  public static String TYPE = "PodTemplate";
+
+  private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
+  private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
+
+  private final KubernetesClientApi client;
+  private final KubernetesTaskRunnerConfig taskRunnerConfig;
+  private final TaskConfig taskConfig;
+  private final DruidNode node;
+  private final ObjectMapper mapper;
+  private final HashMap<String, PodTemplate> templates;
+
+  public PodTemplateTaskAdapter(
+      KubernetesClientApi client,
+      KubernetesTaskRunnerConfig taskRunnerConfig,
+      TaskConfig taskConfig,
+      DruidNode node,
+      ObjectMapper mapper,
+      Properties properties
+  )
+  {
+    this.client = client;
+    this.taskRunnerConfig = taskRunnerConfig;
+    this.taskConfig = taskConfig;
+    this.node = node;
+    this.mapper = mapper;
+    this.templates = initializePodTemplates(properties);
+  }
+
+  @Override
+  public Job fromTask(Task task) throws IOException
+  {
+    PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
+    if (podTemplate == null) {
+      throw new ISE("Pod template spec not found for task type [%s]", task.getType());
+    }
+
+    return new JobBuilder()
+        .withNewMetadata()
+        .withName(new K8sTaskId(task).getK8sTaskId())
+        .addToLabels(getJobLabels(taskRunnerConfig))
+        .addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
+        .endMetadata()
+        .withNewSpec()
+        .withTemplate(podTemplate.getTemplate())
+        .editTemplate()
+        .editOrNewMetadata()
+        .addToAnnotations(getPodTemplateAnnotations(task))
+        .addToLabels(getPodLabels(taskRunnerConfig))
+        .endMetadata()
+        .editSpec()
+        .editFirstContainer()
+        .addAllToEnv(getEnv(task))
+        .endContainer()
+        .endSpec()
+        .endTemplate()
+        .withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
+        .withBackoffLimit(0)
+        .withTtlSecondsAfterFinished((int) taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
+        .endSpec()
+        .build();
+  }
+
+  @Override
+  public Task toTask(Pod from) throws IOException
+  {
+    Map<String, String> annotations = from.getMetadata().getAnnotations();
+    if (annotations == null) {
+      throw new IOE("No annotations found on pod [%s]", from.getMetadata().getName());
+    }
+    String task = annotations.get(DruidK8sConstants.TASK);
+    if (task == null) {
+      throw new IOE("No task annotation found on pod [%s]", from.getMetadata().getName());
+    }
+    return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
+  }
+
+  private HashMap<String, PodTemplate> initializePodTemplates(Properties properties)
+  {
+    HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
+    Optional<PodTemplate> basePodTemplate = loadPodTemplate("base", properties);
+    if (!basePodTemplate.isPresent()) {
+      throw new IAE("Pod template task adapter requires a base pod template to be specified");
+    }
+    podTemplateMap.put("base", basePodTemplate.get());
+
+    MapperConfig config = mapper.getDeserializationConfig();
+    AnnotatedClass cls = AnnotatedClassResolver.resolveWithoutSuperTypes(config, Task.class);
+    Collection<NamedType> taskSubtypes = mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, cls);
+    for (NamedType namedType : taskSubtypes) {
+      String taskType = namedType.getName();
+      Optional<PodTemplate> template = loadPodTemplate(taskType, properties);
+      template.map(podTemplate -> podTemplateMap.put(taskType, podTemplate));
+    }
+    return podTemplateMap;
+  }
+
+  private Optional<PodTemplate> loadPodTemplate(String key, Properties properties)
+  {
+    String property = StringUtils.format(TASK_PROPERTY, key);
+    String podTemplateFile = properties.getProperty(property);
+    if (podTemplateFile == null) {
+      log.debug("Pod template file not specified for [%s]", key);
+      return Optional.empty();
+    }
+    try {
+      return Optional.of(client.executeRequest(client -> client.v1().podTemplates().load(new File(podTemplateFile)).get()));
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile);
+    }
+  }
+
+  private Collection<EnvVar> getEnv(Task task)

Review Comment:
   ## Useless parameter
   
   The parameter 'task' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4382)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org