You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/03 23:50:11 UTC

[pulsar] branch master updated: Separate out FunctionMetadata related helper functions (#7146)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8638022  Separate out FunctionMetadata related helper functions (#7146)
8638022 is described below

commit 86380223203f0b9faecf4f2fa4d2565d6f92b31c
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Jun 3 16:49:55 2020 -0700

    Separate out FunctionMetadata related helper functions (#7146)
    
    * Seperate out FunctionMetaData related functions into a utility class
    
    * Fixed bug
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../functions/utils/FunctionMetaDataUtils.java     | 79 ++++++++++++++++++
 .../functions/utils/FunctionMetaDataUtilsTest.java | 95 ++++++++++++++++++++++
 .../functions/worker/FunctionMetaDataManager.java  | 71 +++-------------
 .../functions/worker/rest/api/ComponentImpl.java   |  5 +-
 .../worker/FunctionMetaDataManagerTest.java        |  9 +-
 5 files changed, 193 insertions(+), 66 deletions(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
new file mode 100644
index 0000000..530b887
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.functions.proto.Function;
+
+public class FunctionMetaDataUtils {
+
+    public static boolean canChangeState(Function.FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) {
+        if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) {
+            return false;
+        }
+        if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
+            // This means that all instances of the functions are running
+            return newState == Function.FunctionState.STOPPED;
+        }
+        if (instanceId >= 0) {
+            if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
+                return functionMetaData.getInstanceStatesMap().get(instanceId) != newState;
+            } else {
+                return false;
+            }
+        } else {
+            // want to change state for all instances
+            for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
+                if (state != newState) return true;
+            }
+            return false;
+        }
+    }
+
+    public static Function.FunctionMetaData changeFunctionInstanceStatus(Function.FunctionMetaData functionMetaData,
+                                                                         Integer instanceId, boolean start) {
+        Function.FunctionMetaData.Builder builder = functionMetaData.toBuilder()
+                .setVersion(functionMetaData.getVersion() + 1);
+        if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) {
+            for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+                builder.putInstanceStates(i, Function.FunctionState.RUNNING);
+            }
+        }
+        Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED;
+        if (instanceId < 0) {
+            for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+                builder.putInstanceStates(i, state);
+            }
+        } else if (instanceId < builder.getFunctionDetails().getParallelism()){
+            builder.putInstanceStates(instanceId, state);
+        }
+        return builder.build();
+    }
+
+    public static Function.FunctionMetaData generateUpdatedMetadata(Function.FunctionMetaData existingMetaData,
+                                                                    Function.FunctionMetaData updatedMetaData) {
+        long version = 0;
+        if (existingMetaData != null) {
+            version = existingMetaData.getVersion() + 1;
+        }
+        return updatedMetaData.toBuilder()
+                .setVersion(version)
+                .build();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
new file mode 100644
index 0000000..d708aca
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link FunctionMetaDataUtils}.
+ */
+public class FunctionMetaDataUtilsTest {
+
+    @Test
+    public void testCanChangeState() {
+
+        long version = 5;
+        Function.FunctionMetaData metaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+
+        Assert.assertTrue(FunctionMetaDataUtils.canChangeState(metaData, 0, Function.FunctionState.STOPPED));
+        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 0, Function.FunctionState.RUNNING));
+        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 2, Function.FunctionState.STOPPED));
+        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 2, Function.FunctionState.RUNNING));
+    }
+
+    @Test
+    public void testChangeState() {
+        long version = 5;
+        Function.FunctionMetaData metaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+        Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(metaData, 0, false);
+        Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
+        Assert.assertEquals(newMetaData.getVersion(), version + 1);
+
+        // Nothing should happen
+        newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, 3, false);
+        Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
+        Assert.assertEquals(newMetaData.getVersion(), version + 2);
+
+        // Change one more
+        newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, 1, false);
+        Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.STOPPED);
+        Assert.assertEquals(newMetaData.getVersion(), version + 3);
+
+        // Change all more
+        newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, -1, true);
+        Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.RUNNING);
+        Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
+        Assert.assertEquals(newMetaData.getVersion(), version + 4);
+    }
+
+    @Test
+    public void testUpdate() {
+        long version = 5;
+        Function.FunctionMetaData existingMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+        Function.FunctionMetaData updatedMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(3)).setVersion(version).build();
+        Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingMetaData, updatedMetaData);
+        Assert.assertEquals(newMetaData.getVersion(), version + 1);
+        Assert.assertEquals(newMetaData.getFunctionDetails().getParallelism(), 3);
+
+        newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(null, newMetaData);
+        Assert.assertEquals(newMetaData.getVersion(), 0);
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 6680015..c3273fa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
 import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
@@ -175,26 +176,16 @@ public class FunctionMetaDataManager implements AutoCloseable {
      */
     public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
 
-        long version = 0;
-
-        String tenant = functionMetaData.getFunctionDetails().getTenant();
-        if (!this.functionMetaDataMap.containsKey(tenant)) {
-            this.functionMetaDataMap.put(tenant, new ConcurrentHashMap<>());
-        }
-
-        Map<String, Map<String, FunctionMetaData>> namespaces = this.functionMetaDataMap.get(tenant);
-        String namespace = functionMetaData.getFunctionDetails().getNamespace();
-        if (!namespaces.containsKey(namespace)) {
-            namespaces.put(namespace, new ConcurrentHashMap<>());
-        }
-
-        Map<String, FunctionMetaData> functionMetaDatas = namespaces.get(namespace);
-        String functionName = functionMetaData.getFunctionDetails().getName();
-        if (functionMetaDatas.containsKey(functionName)) {
-            version = functionMetaDatas.get(functionName).getVersion() + 1;
+        FunctionMetaData existingFunctionMetadata = null;
+        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
+                functionMetaData.getFunctionDetails().getNamespace(),
+                functionMetaData.getFunctionDetails().getName())) {
+            existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
+                    functionMetaData.getFunctionDetails().getNamespace(),
+                    functionMetaData.getFunctionDetails().getName());
         }
 
-        FunctionMetaData newFunctionMetaData = functionMetaData.toBuilder().setVersion(version).build();
+        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
 
         Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
                 this.workerConfig.getWorkerId(), newFunctionMetaData);
@@ -213,9 +204,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
     public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
         FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
 
-        FunctionMetaData newFunctionMetaData = functionMetaData.toBuilder()
-                .setVersion(functionMetaData.getVersion() + 1)
-                .build();
+        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
 
         Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
                 this.workerConfig.getWorkerId(), newFunctionMetaData);
@@ -236,22 +225,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
                                                                                       Integer instanceId, boolean start) {
         FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
 
-        FunctionMetaData.Builder builder = functionMetaData.toBuilder()
-                .setVersion(functionMetaData.getVersion() + 1);
-        if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) {
-            for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
-                builder.putInstanceStates(i, Function.FunctionState.RUNNING);
-            }
-        }
-        Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED;
-        if (instanceId < 0) {
-            for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
-                builder.putInstanceStates(i, state);
-            }
-        } else {
-            builder.putInstanceStates(instanceId, state);
-        }
-        FunctionMetaData newFunctionMetaData = builder.build();
+        FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
 
         Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
                 this.workerConfig.getWorkerId(), newFunctionMetaData);
@@ -457,29 +431,6 @@ public class FunctionMetaDataManager implements AutoCloseable {
         }
     }
 
-    public boolean canChangeState(FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) {
-        if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) {
-            return false;
-        }
-        if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
-            // This means that all instances of the functions are running
-            return newState == Function.FunctionState.STOPPED;
-        }
-        if (instanceId >= 0) {
-            if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
-                return functionMetaData.getInstanceStatesMap().get(instanceId) != newState;
-            } else {
-                return false;
-            }
-        } else {
-            // want to change state for all instances
-            for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
-                if (state != newState) return true;
-            }
-            return false;
-        }
-    }
-
     private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
         return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index cad442f..0b128d9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -528,7 +529,7 @@ public abstract class ComponentImpl {
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
+        if (!FunctionMetaDataUtils.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
             log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName);
             throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
         }
@@ -656,7 +657,7 @@ public abstract class ComponentImpl {
             throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
+        if (!FunctionMetaDataUtils.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
             log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName);
             throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
         }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index f16bd3f..75be623 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -182,10 +183,10 @@ public class FunctionMetaDataManagerTest {
                 Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
         functionMetaDataMap1.put("func-1", f1);
 
-        Assert.assertTrue(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.STOPPED));
-        Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.RUNNING));
-        Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.STOPPED));
-        Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.RUNNING));
+        Assert.assertTrue(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.STOPPED));
+        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.RUNNING));
+        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.STOPPED));
+        Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.RUNNING));
 
         functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
         functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1);