You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/02 21:48:09 UTC

[GitHub] srkukarni closed pull request #2692: Admission checks

srkukarni closed pull request #2692: Admission checks
URL: https://github.com/apache/pulsar/pull/2692
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 0573d88012..369b5a62f2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -59,7 +59,7 @@
 class KubernetesRuntime implements Runtime {
 
     private static final String ENV_SHARD_ID = "SHARD_ID";
-    private static final int maxJobNameSize = 63;
+    private static final int maxJobNameSize = 55;
     private static final Integer GRPC_PORT = 9093;
     public static final Pattern VALID_POD_NAME_REGEX =
             Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
@@ -535,7 +535,7 @@ private static String createJobName(String tenant, String namespace, String func
         return "pf-" + tenant + "-" + namespace + "-" + functionName;
     }
 
-    private static void doChecks(Function.FunctionDetails functionDetails) {
+    public static void doChecks(Function.FunctionDetails functionDetails) {
         final String jobName = createJobName(functionDetails);
         if (!jobName.equals(jobName.toLowerCase())) {
             throw new RuntimeException("Kubernetes does not allow upper case jobNames.");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index c55935b0fe..5b902dba27 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -28,6 +28,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
@@ -129,6 +130,11 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
     public void close() {
     }
 
+    @Override
+    public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
+        KubernetesRuntime.doChecks(functionDetails);
+    }
+
     private void setupClient() throws Exception {
         if (appsClient == null) {
             if (k8Uri == null) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index fd8a7bebb7..ead1ed1c72 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.runtime;
 
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
 
 /**
  * A factory to create {@link Runtime}s to invoke functions.
@@ -38,6 +39,8 @@ Runtime createContainer(
 
     default boolean externallyManaged() { return false; }
 
+    default void doAdmissionChecks(Function.FunctionDetails functionDetails) { }
+
     @Override
     void close();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index b92ae27474..4aa30166d2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -169,6 +169,14 @@ public Response registerFunction(final String tenant, final String namespace, fi
                     .entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
         }
 
+        try {
+            worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+        } catch (Exception e) {
+            log.error("Function {}/{}/{} cannot be admitted by the runtime factory", tenant, namespace, functionName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+        }
+
         // function state
         FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
                 .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
@@ -235,6 +243,14 @@ public Response updateFunction(final String tenant, final String namespace, fina
                     .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
         }
 
+        try {
+            worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+        } catch (Exception e) {
+            log.error("Updated Function {}/{}/{} cannot be submitted to runtime factory", tenant, namespace, functionName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+        }
+
         // function state
         FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
                 .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 8b12d5ed07..3d7620ed42 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -62,11 +62,9 @@
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.Utils;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.io.core.Sink;
@@ -130,6 +128,8 @@ public void write(Record<byte[]> record) throws Exception {
 
     private WorkerService mockedWorkerService;
     private FunctionMetaDataManager mockedManager;
+    private FunctionRuntimeManager mockedFunctionRunTimeManager;
+    private RuntimeFactory mockedRuntimeFactory;
     private Namespace mockedNamespace;
     private FunctionsImpl resource;
     private InputStream mockedInputStream;
@@ -138,6 +138,8 @@ public void write(Record<byte[]> record) throws Exception {
     @BeforeMethod
     public void setup() {
         this.mockedManager = mock(FunctionMetaDataManager.class);
+        this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedRuntimeFactory = mock(RuntimeFactory.class);
         this.mockedInputStream = mock(InputStream.class);
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
@@ -145,6 +147,8 @@ public void setup() {
 
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+        when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services