You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/03 23:50:11 UTC
[pulsar] branch master updated: Separate out FunctionMetadata
related helper functions (#7146)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8638022 Separate out FunctionMetadata related helper functions (#7146)
8638022 is described below
commit 86380223203f0b9faecf4f2fa4d2565d6f92b31c
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Jun 3 16:49:55 2020 -0700
Separate out FunctionMetadata related helper functions (#7146)
* Seperate out FunctionMetaData related functions into a utility class
* Fixed bug
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../functions/utils/FunctionMetaDataUtils.java | 79 ++++++++++++++++++
.../functions/utils/FunctionMetaDataUtilsTest.java | 95 ++++++++++++++++++++++
.../functions/worker/FunctionMetaDataManager.java | 71 +++-------------
.../functions/worker/rest/api/ComponentImpl.java | 5 +-
.../worker/FunctionMetaDataManagerTest.java | 9 +-
5 files changed, 193 insertions(+), 66 deletions(-)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
new file mode 100644
index 0000000..530b887
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.functions.proto.Function;
+
+public class FunctionMetaDataUtils {
+
+ public static boolean canChangeState(Function.FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) {
+ if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) {
+ return false;
+ }
+ if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
+ // This means that all instances of the functions are running
+ return newState == Function.FunctionState.STOPPED;
+ }
+ if (instanceId >= 0) {
+ if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
+ return functionMetaData.getInstanceStatesMap().get(instanceId) != newState;
+ } else {
+ return false;
+ }
+ } else {
+ // want to change state for all instances
+ for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
+ if (state != newState) return true;
+ }
+ return false;
+ }
+ }
+
+ public static Function.FunctionMetaData changeFunctionInstanceStatus(Function.FunctionMetaData functionMetaData,
+ Integer instanceId, boolean start) {
+ Function.FunctionMetaData.Builder builder = functionMetaData.toBuilder()
+ .setVersion(functionMetaData.getVersion() + 1);
+ if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) {
+ for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+ builder.putInstanceStates(i, Function.FunctionState.RUNNING);
+ }
+ }
+ Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED;
+ if (instanceId < 0) {
+ for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
+ builder.putInstanceStates(i, state);
+ }
+ } else if (instanceId < builder.getFunctionDetails().getParallelism()){
+ builder.putInstanceStates(instanceId, state);
+ }
+ return builder.build();
+ }
+
+ public static Function.FunctionMetaData generateUpdatedMetadata(Function.FunctionMetaData existingMetaData,
+ Function.FunctionMetaData updatedMetaData) {
+ long version = 0;
+ if (existingMetaData != null) {
+ version = existingMetaData.getVersion() + 1;
+ }
+ return updatedMetaData.toBuilder()
+ .setVersion(version)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
new file mode 100644
index 0000000..d708aca
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link FunctionMetaDataUtils}.
+ */
+public class FunctionMetaDataUtilsTest {
+
+ @Test
+ public void testCanChangeState() {
+
+ long version = 5;
+ Function.FunctionMetaData metaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+
+ Assert.assertTrue(FunctionMetaDataUtils.canChangeState(metaData, 0, Function.FunctionState.STOPPED));
+ Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 0, Function.FunctionState.RUNNING));
+ Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 2, Function.FunctionState.STOPPED));
+ Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 2, Function.FunctionState.RUNNING));
+ }
+
+ @Test
+ public void testChangeState() {
+ long version = 5;
+ Function.FunctionMetaData metaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+ Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(metaData, 0, false);
+ Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
+ Assert.assertEquals(newMetaData.getVersion(), version + 1);
+
+ // Nothing should happen
+ newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, 3, false);
+ Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
+ Assert.assertEquals(newMetaData.getVersion(), version + 2);
+
+ // Change one more
+ newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, 1, false);
+ Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.STOPPED);
+ Assert.assertEquals(newMetaData.getVersion(), version + 3);
+
+ // Change all more
+ newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, -1, true);
+ Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.RUNNING);
+ Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
+ Assert.assertEquals(newMetaData.getVersion(), version + 4);
+ }
+
+ @Test
+ public void testUpdate() {
+ long version = 5;
+ Function.FunctionMetaData existingMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
+ Function.FunctionMetaData updatedMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+ Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(3)).setVersion(version).build();
+ Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingMetaData, updatedMetaData);
+ Assert.assertEquals(newMetaData.getVersion(), version + 1);
+ Assert.assertEquals(newMetaData.getFunctionDetails().getParallelism(), 3);
+
+ newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(null, newMetaData);
+ Assert.assertEquals(newMetaData.getVersion(), 0);
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 6680015..c3273fa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
@@ -175,26 +176,16 @@ public class FunctionMetaDataManager implements AutoCloseable {
*/
public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
- long version = 0;
-
- String tenant = functionMetaData.getFunctionDetails().getTenant();
- if (!this.functionMetaDataMap.containsKey(tenant)) {
- this.functionMetaDataMap.put(tenant, new ConcurrentHashMap<>());
- }
-
- Map<String, Map<String, FunctionMetaData>> namespaces = this.functionMetaDataMap.get(tenant);
- String namespace = functionMetaData.getFunctionDetails().getNamespace();
- if (!namespaces.containsKey(namespace)) {
- namespaces.put(namespace, new ConcurrentHashMap<>());
- }
-
- Map<String, FunctionMetaData> functionMetaDatas = namespaces.get(namespace);
- String functionName = functionMetaData.getFunctionDetails().getName();
- if (functionMetaDatas.containsKey(functionName)) {
- version = functionMetaDatas.get(functionName).getVersion() + 1;
+ FunctionMetaData existingFunctionMetadata = null;
+ if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
+ functionMetaData.getFunctionDetails().getNamespace(),
+ functionMetaData.getFunctionDetails().getName())) {
+ existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
+ functionMetaData.getFunctionDetails().getNamespace(),
+ functionMetaData.getFunctionDetails().getName());
}
- FunctionMetaData newFunctionMetaData = functionMetaData.toBuilder().setVersion(version).build();
+ FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
this.workerConfig.getWorkerId(), newFunctionMetaData);
@@ -213,9 +204,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
- FunctionMetaData newFunctionMetaData = functionMetaData.toBuilder()
- .setVersion(functionMetaData.getVersion() + 1)
- .build();
+ FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
this.workerConfig.getWorkerId(), newFunctionMetaData);
@@ -236,22 +225,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
Integer instanceId, boolean start) {
FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
- FunctionMetaData.Builder builder = functionMetaData.toBuilder()
- .setVersion(functionMetaData.getVersion() + 1);
- if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) {
- for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
- builder.putInstanceStates(i, Function.FunctionState.RUNNING);
- }
- }
- Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED;
- if (instanceId < 0) {
- for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
- builder.putInstanceStates(i, state);
- }
- } else {
- builder.putInstanceStates(instanceId, state);
- }
- FunctionMetaData newFunctionMetaData = builder.build();
+ FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
this.workerConfig.getWorkerId(), newFunctionMetaData);
@@ -457,29 +431,6 @@ public class FunctionMetaDataManager implements AutoCloseable {
}
}
- public boolean canChangeState(FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) {
- if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) {
- return false;
- }
- if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
- // This means that all instances of the functions are running
- return newState == Function.FunctionState.STOPPED;
- }
- if (instanceId >= 0) {
- if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
- return functionMetaData.getInstanceStatesMap().get(instanceId) != newState;
- } else {
- return false;
- }
- } else {
- // want to change state for all instances
- for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
- if (state != newState) return true;
- }
- return false;
- }
- }
-
private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index cad442f..0b128d9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -528,7 +529,7 @@ public abstract class ComponentImpl {
throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
- if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
+ if (!FunctionMetaDataUtils.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName);
throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
}
@@ -656,7 +657,7 @@ public abstract class ComponentImpl {
throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
- if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
+ if (!FunctionMetaDataUtils.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName);
throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index f16bd3f..75be623 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -182,10 +183,10 @@ public class FunctionMetaDataManagerTest {
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
functionMetaDataMap1.put("func-1", f1);
- Assert.assertTrue(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.STOPPED));
- Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.RUNNING));
- Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.STOPPED));
- Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.RUNNING));
+ Assert.assertTrue(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.STOPPED));
+ Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.RUNNING));
+ Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.STOPPED));
+ Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.RUNNING));
functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1);