You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/26 14:49:12 UTC
[pulsar] branch master updated: Re-work Function MetaDataManager to
make all metadata writes only by the leader (#7255)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c83a656 Re-work Function MetaDataManager to make all metadata writes only by the leader (#7255)
c83a656 is described below
commit c83a6563f9e0c71d7a5fd62fdece3e29128bbae0
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Jun 26 07:49:02 2020 -0700
Re-work Function MetaDataManager to make all metadata writes only by the leader (#7255)
* Function workers re-direct call update requests to the leader
* Fixed test
* tests pass
* Working version
* Fix test
* Short circuit update
* Fix test
* Fix test
* Fix tests
* Added one more catch
* Added one more catch
* Seperated internal and external errors
* Fix test
* Address feedback
* Do not expose updateOnLeader to functions
* hide api
* hide api
* removed duplicate comments
* Do leadership changes in function metadata manager
* make the function sync
* Added more comments
* Throw error
* Changed name
* address comments
* Deleted unused classes
* Rework metadata manager
* Working
* Fix test
* A better way for test
* Address feedback
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 22 +
.../worker/PulsarFunctionE2ESecurityTest.java | 2 +-
.../worker/PulsarFunctionPublishTest.java | 1 +
.../client/admin/internal/FunctionsImpl.java | 48 +++
.../functions/utils/FunctionMetaDataUtils.java | 4 +-
.../functions/utils/FunctionMetaDataUtilsTest.java | 4 +-
.../functions/worker/FunctionMetaDataManager.java | 356 ++++++++--------
.../worker/FunctionMetaDataTopicTailer.java | 71 +++-
.../pulsar/functions/worker/LeaderService.java | 5 +
.../pulsar/functions/worker/WorkerService.java | 1 +
.../functions/worker/request/RequestResult.java | 40 --
.../worker/request/ServiceRequestInfo.java | 46 ---
.../worker/request/ServiceRequestManager.java | 53 ---
.../worker/request/ServiceRequestUtils.java | 61 ---
.../functions/worker/rest/api/ComponentImpl.java | 97 ++---
.../functions/worker/rest/api/FunctionsImpl.java | 60 ++-
.../functions/worker/rest/api/SinksImpl.java | 4 +-
.../functions/worker/rest/api/SourcesImpl.java | 4 +-
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 22 +
.../worker/FunctionMetaDataManagerTest.java | 449 ++++++---------------
.../worker/FunctionMetaDataTopicTailerTest.java | 16 +-
.../worker/request/ServiceRequestManagerTest.java | 70 ----
.../rest/api/v2/FunctionApiV2ResourceTest.java | 122 ++----
.../rest/api/v3/FunctionApiV3ResourceTest.java | 118 ++----
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 107 ++---
.../rest/api/v3/SourceApiV3ResourceTest.java | 105 ++---
26 files changed, 699 insertions(+), 1189 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 0529c04..f77528a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -47,6 +47,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -692,4 +693,25 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors();
}
+
+ @PUT
+ @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have super-user permissions"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 307, message = "Redirecting to the worker leader"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ })
+ @Path("/leader/{tenant}/{namespace}/{functionName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
+ final @FormDataParam("delete") boolean delete) {
+
+ functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
+ delete, uri.getRequestUri(), clientAppId());
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 48c40f3..03f9bd7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -201,7 +201,7 @@ public class PulsarFunctionE2ESecurityTest {
superUserAdmin.tenants().createTenant(TENANT2, propAdmin);
superUserAdmin.namespaces().createNamespace( TENANT2 + "/" + NAMESPACE);
- Thread.sleep(100);
+ functionWorkerService.get().getLeaderService().waitLeaderInit();
}
@AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index cce8ce8..90882bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -206,6 +206,7 @@ public class PulsarFunctionPublishTest {
System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+ functionWorkerService.get().getLeaderService().waitLeaderInit();
}
@AfterMethod
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 0e081d4..577f635 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -63,6 +63,7 @@ import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.ByteArrayPart;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.StringPart;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
@@ -984,4 +985,51 @@ public class FunctionsImpl extends ComponentResource implements Functions {
}
return future;
}
+
+ public void updateOnWorkerLeader(String tenant, String namespace,
+ String function, byte[] functionMetaData,
+ boolean delete) throws PulsarAdminException {
+ try {
+ updateOnWorkerLeaderAsync(tenant, namespace, function,
+ functionMetaData, delete).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String namespace,
+ String function, byte[] functionMetaData,
+ boolean delete) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ RequestBuilder builder =
+ put(functions.path("leader").path(tenant).path(namespace)
+ .path(function).getUri().toASCIIString())
+ .addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
+ .addBodyPart(new StringPart("delete", Boolean.toString(delete)));
+
+ asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
+ .toCompletableFuture()
+ .thenAccept(response -> {
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ future.completeExceptionally(
+ getApiException(Response
+ .status(response.getStatusCode())
+ .entity(response.getResponseBody())
+ .build()));
+ } else {
+ future.complete(null);
+ }
+ });
+
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
index 530b887..46766ef 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtils.java
@@ -66,8 +66,8 @@ public class FunctionMetaDataUtils {
return builder.build();
}
- public static Function.FunctionMetaData generateUpdatedMetadata(Function.FunctionMetaData existingMetaData,
- Function.FunctionMetaData updatedMetaData) {
+ public static Function.FunctionMetaData incrMetadataVersion(Function.FunctionMetaData existingMetaData,
+ Function.FunctionMetaData updatedMetaData) {
long version = 0;
if (existingMetaData != null) {
version = existingMetaData.getVersion() + 1;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
index d708aca..4e09c05 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionMetaDataUtilsTest.java
@@ -85,11 +85,11 @@ public class FunctionMetaDataUtilsTest {
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
Function.FunctionMetaData updatedMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(3)).setVersion(version).build();
- Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingMetaData, updatedMetaData);
+ Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.incrMetadataVersion(existingMetaData, updatedMetaData);
Assert.assertEquals(newMetaData.getVersion(), version + 1);
Assert.assertEquals(newMetaData.getFunctionDetails().getParallelism(), 3);
- newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(null, newMetaData);
+ newMetaData = FunctionMetaDataUtils.incrMetadataVersion(null, newMetaData);
Assert.assertEquals(newMetaData.getVersion(), 0);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 0c4779e..795a735 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -19,31 +19,37 @@
package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import lombok.Getter;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
-import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
-import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
+import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
- * This class maintains a global state of all function metadata and is responsible for serving function metadata
+ * FunctionMetaDataManager maintains a global state of all function metadata.
+ * It is the system of record for the worker for function metadata.
+ * FunctionMetaDataManager operates in either the leader mode or worker mode.
+ * By default, when you initialize and start manager, it starts in the worker mode.
+ * In the worker mode, the FunctionMetaDataTailer tails the function metadata topic
+ * and updates the in-memory metadata cache.
+ * When the worker becomes a leader, it calls the acquireLeadaership thru which
+ * the FunctionMetaData Manager switches to a leader mode. In the leader mode
+ * the manager first captures an exclusive producer on the the metadata topic.
+ * Then it drains the MetaDataTailer to ensure that it has caught up to the last record.
+ * After this point, the worker can update the in-memory state of function metadata
+ * by calling processUpdate/processDeregister methods.
+ * If a worker loses its leadership, it calls giveupLeaderShip at which time the
+ * manager closes its exclusive producer and starts its tailer again.
*/
@Slf4j
public class FunctionMetaDataManager implements AutoCloseable {
@@ -52,23 +58,20 @@ public class FunctionMetaDataManager implements AutoCloseable {
@VisibleForTesting
final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
- // A map in which the key is the service request id and value is the service request
- private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
-
- private final ServiceRequestManager serviceRequestManager;
private final SchedulerManager schedulerManager;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
private final ErrorNotifier errorNotifier;
private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
-
- @Setter
- @Getter
- boolean isInitializePhase = false;
+ // The producer of the metadata topic when we are the leader.
+ // Note that this variable serves a double duty. A non-null value
+ // implies we are the leader, while a null value means we are not the leader
+ private Producer exclusiveLeaderProducer;
+ private MessageId lastMessageSeen = MessageId.earliest;
@Getter
- private final CompletableFuture<Void> isInitialized = new CompletableFuture<>();
+ private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
public FunctionMetaDataManager(WorkerConfig workerConfig,
SchedulerManager schedulerManager,
@@ -76,10 +79,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
ErrorNotifier errorNotifier) throws PulsarClientException {
this.workerConfig = workerConfig;
this.pulsarClient = pulsarClient;
- this.serviceRequestManager = getServiceRequestManager(
- this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
this.schedulerManager = schedulerManager;
this.errorNotifier = errorNotifier;
+ exclusiveLeaderProducer = null;
}
/**
@@ -87,32 +89,44 @@ public class FunctionMetaDataManager implements AutoCloseable {
*/
/**
- * Initializes the FunctionMetaDataManager. Does the following:
- * 1. Consume all existing function meta data upon start to establish existing state
+ * Initializes the FunctionMetaDataManager.
+ * We create a new reader
*/
- public void initialize() {
+ public synchronized void initialize() {
try {
- this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
- pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
// read all existing messages
- this.setInitializePhase(true);
- while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
- this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
+ Reader reader = FunctionMetaDataTopicTailer.createReader(workerConfig, pulsarClient.newReader(), MessageId.earliest);
+ while (reader.hasMessageAvailable()) {
+ processMetaDataTopicMessage(reader.readNext());
}
- this.setInitializePhase(false);
-
this.isInitialized.complete(null);
} catch (Exception e) {
log.error("Failed to initialize meta data store", e);
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to initialize Metadata Manager", e);
}
+ log.info("FunctionMetaData Manager initialization complete");
}
-
- public void start() {
- // schedule functions if necessary
- this.schedulerManager.schedule();
- // start function metadata tailer
- this.functionMetaDataTopicTailer.start();
+
+ // Starts the tailer if we are in non-leader mode
+ public synchronized void start() {
+ if (exclusiveLeaderProducer == null) {
+ try {
+ // This means that we are in non-leader mode. start function metadata tailer
+ initializeTailer();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("Could not start MetaData topic tailer", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.functionMetaDataTopicTailer != null) {
+ this.functionMetaDataTopicTailer.close();
+ }
+ if (this.exclusiveLeaderProducer != null) {
+ this.exclusiveLeaderProducer.close();
+ }
}
/**
@@ -174,89 +188,126 @@ public class FunctionMetaDataManager implements AutoCloseable {
}
/**
- * Sends an update request to the FMT (Function Metadata Topic)
- * @param functionMetaData The function metadata that needs to be updated
- * @return a completable future of when the update has been applied
+ * Called by the worker when we are in the leader mode. In this state, we update our in-memory
+ * data structures and then write to the metadata topic.
+ * @param functionMetaData The function metadata in question
+ * @param delete Is this a delete operation
+ * @throws IllegalStateException if we are not the leader
+ * @throws IllegalArgumentException if the request is out of date.
*/
- public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {
-
- FunctionMetaData existingFunctionMetadata = null;
- if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
- functionMetaData.getFunctionDetails().getNamespace(),
- functionMetaData.getFunctionDetails().getName())) {
- existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
- functionMetaData.getFunctionDetails().getNamespace(),
- functionMetaData.getFunctionDetails().getName());
+ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete)
+ throws IllegalStateException, IllegalArgumentException {
+ if (exclusiveLeaderProducer == null) {
+ throw new IllegalStateException("Not the leader");
+ }
+ boolean needsScheduling;
+ if (delete) {
+ needsScheduling = proccessDeregister(functionMetaData);
+ } else {
+ needsScheduling = processUpdate(functionMetaData);
+ }
+ Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+ .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+ .setFunctionMetaData(functionMetaData)
+ .setWorkerId(workerConfig.getWorkerId())
+ .setRequestId(UUID.randomUUID().toString())
+ .build();
+ try {
+ lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+ } catch (Exception e) {
+ log.error("Could not write into Function Metadata topic", e);
+ throw new IllegalStateException("Internal Error updating function at the leader", e);
+ }
+ if (needsScheduling) {
+ this.schedulerManager.schedule();
}
-
- FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);
-
- Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
- this.workerConfig.getWorkerId(), newFunctionMetaData);
-
- return submit(updateRequest);
}
-
/**
- * Sends a deregister request to the FMT (Function Metadata Topic) for a function
- * @param tenant the tenant the function that needs to be deregistered belongs to
- * @param namespace the namespace the function that needs to be deregistered belongs to
- * @param functionName the name of the function
- * @return a completable future of when the deregister has been applied
+ * Called by the leader service when this worker becomes the leader.
+ * We first get exclusive producer on the metadata topic. Next we drain the tailer
+ * to ensure that we have caught up to metadata topic. After which we close the tailer.
+ * Note that this method cannot be syncrhonized because the tailer might still be processing messages
*/
- public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
- FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
- FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);
-
- Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
- this.workerConfig.getWorkerId(), newFunctionMetaData);
+ public void acquireLeadership() {
+ log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
+ FunctionMetaDataTopicTailer tailer = internalAcquireLeadership();
+ // Now that we have created the exclusive producer, wait for reader to get over
+ if (tailer != null) {
+ try {
+ tailer.stopWhenNoMoreMessages().get();
+ } catch (Exception e) {
+ log.error("Error while waiting for metadata tailer thread to finish", e);
+ errorNotifier.triggerError(e);
+ }
+ tailer.close();
+ }
+ log.info("FunctionMetaDataManager done becoming leader");
+ }
- return submit(deregisterRequest);
+ private synchronized FunctionMetaDataTopicTailer internalAcquireLeadership() {
+ if (exclusiveLeaderProducer == null) {
+ try {
+ exclusiveLeaderProducer = pulsarClient.newProducer()
+ .topic(this.workerConfig.getFunctionMetadataTopic())
+ .producerName(workerConfig.getWorkerId() + "-leader")
+ // .type(EXCLUSIVE)
+ .create();
+ } catch (PulsarClientException e) {
+ log.error("Error creating exclusive producer", e);
+ errorNotifier.triggerError(e);
+ }
+ } else {
+ log.error("Logic Error in FunctionMetaData Manager");
+ errorNotifier.triggerError(new IllegalStateException());
+ }
+ FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
+ this.functionMetaDataTopicTailer = null;
+ return tailer;
}
/**
- * Sends a start/stop function request to the FMT (Function Metadata Topic) for a function
- * @param tenant the tenant the function that needs to be deregistered belongs to
- * @param namespace the namespace the function that needs to be deregistered belongs to
- * @param functionName the name of the function
- * @param instanceId the instanceId of the function, -1 if for all instances
- * @param start do we need to start or stop
- * @return a completable future of when the start/stop has been applied
+ * called by the leader service when we lose leadership. We close the exclusive producer
+ * and start the tailer.
*/
- public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName,
- Integer instanceId, boolean start) {
- FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
-
- FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);
-
- Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
- this.workerConfig.getWorkerId(), newFunctionMetaData);
-
- return submit(updateRequest);
+ public synchronized void giveupLeadership() {
+ log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
+ try {
+ exclusiveLeaderProducer.close();
+ exclusiveLeaderProducer = null;
+ initializeTailer();
+ } catch (PulsarClientException e) {
+ log.error("Error closing exclusive producer", e);
+ errorNotifier.triggerError(e);
+ }
}
/**
- * Processes a request received from the FMT (Function Metadata Topic)
- * @param messageId The message id of the request
- * @param serviceRequest The request
+ * This is called by the MetaData tailer. It updates the in-memory cache.
+ * It eats up any exception thrown by processUpdate/processDeregister since
+ * that's just part of the state machine
+ * @param message The message read from metadata topic that needs to be processed
*/
- public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {
-
- // make sure that processing requests don't happen simultaneously
- synchronized (this) {
+ public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
+ try {
+ Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
+ if (log.isDebugEnabled()) {
+ log.debug("Received Service Request: {}", serviceRequest);
+ }
switch (serviceRequest.getServiceRequestType()) {
case UPDATE:
- this.processUpdate(serviceRequest);
+ this.processUpdate(serviceRequest.getFunctionMetaData());
break;
case DELETE:
- this.proccessDeregister(serviceRequest);
+ this.proccessDeregister(serviceRequest.getFunctionMetaData());
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
}
+ } catch (IllegalArgumentException e) {
+ // Its ok. Nothing much we can do about it
}
+ lastMessageSeen = message.getMessageId();
}
/**
@@ -284,48 +335,38 @@ public class FunctionMetaDataManager implements AutoCloseable {
}
@VisibleForTesting
- synchronized void proccessDeregister(Request.ServiceRequest deregisterRequest) {
+ synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
- FunctionMetaData deregisterRequestFs = deregisterRequest.getFunctionMetaData();
String functionName = deregisterRequestFs.getFunctionDetails().getName();
String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
boolean needsScheduling = false;
- log.debug("Process deregister request: {}", deregisterRequest);
+ log.debug("Process deregister request: {}", deregisterRequestFs);
// Check if we still have this function. Maybe already deleted by someone else
if (this.containsFunctionMetaData(deregisterRequestFs)) {
// check if request is outdated
- if (!isRequestOutdated(deregisterRequest)) {
+ if (!isRequestOutdated(deregisterRequestFs)) {
this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
- completeRequest(deregisterRequest, true);
needsScheduling = true;
} else {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
- deregisterRequest.getFunctionMetaData().getVersion());
+ deregisterRequestFs.getVersion());
}
- completeRequest(deregisterRequest, false,
- "Request ignored because it is out of date. Please try again.");
+ throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
}
- } else {
- // already deleted so just complete request
- completeRequest(deregisterRequest, true);
}
- if (!this.isInitializePhase() && needsScheduling) {
- this.schedulerManager.schedule();
- }
+ return needsScheduling;
}
@VisibleForTesting
- synchronized void processUpdate(Request.ServiceRequest updateRequest) {
+ synchronized boolean processUpdate(FunctionMetaData updateRequestFs) throws IllegalArgumentException {
- log.debug("Process update request: {}", updateRequest);
-
- FunctionMetaData updateRequestFs = updateRequest.getFunctionMetaData();
+ log.debug("Process update request: {}", updateRequestFs);
boolean needsScheduling = false;
@@ -334,52 +375,23 @@ public class FunctionMetaDataManager implements AutoCloseable {
// Since this is the first time worker has seen function, just put it into internal function metadata store
setFunctionMetaData(updateRequestFs);
needsScheduling = true;
- completeRequest(updateRequest, true);
} else {
// The request is an update to an existing function since this worker already has a record of this function
// in its function metadata store
// Check if request is outdated
- if (!isRequestOutdated(updateRequest)) {
+ if (!isRequestOutdated(updateRequestFs)) {
// update the function metadata
setFunctionMetaData(updateRequestFs);
needsScheduling = true;
- completeRequest(updateRequest, true);
} else {
- completeRequest(updateRequest, false,
- "Request ignored because it is out of date. Please try again.");
+ throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again.");
}
}
- if (!this.isInitializePhase() && needsScheduling) {
- this.schedulerManager.schedule();
- }
+ return needsScheduling;
}
- /**
- * Complete requests that this worker has pending
- * @param serviceRequest
- * @param isSuccess
- * @param message
- */
- private void completeRequest(Request.ServiceRequest serviceRequest, boolean isSuccess, String message) {
- ServiceRequestInfo pendingServiceRequestInfo
- = this.pendingServiceRequests.getOrDefault(
- serviceRequest.getRequestId(), null);
- if (pendingServiceRequestInfo != null) {
- RequestResult requestResult = new RequestResult();
- requestResult.setSuccess(isSuccess);
- requestResult.setMessage(message);
- pendingServiceRequestInfo.getRequestResultCompletableFuture().complete(requestResult);
- }
- }
-
- private void completeRequest(Request.ServiceRequest serviceRequest, boolean isSuccess) {
- completeRequest(serviceRequest, isSuccess, null);
- }
-
-
- private boolean isRequestOutdated(Request.ServiceRequest serviceRequest) {
- FunctionMetaData requestFunctionMetaData = serviceRequest.getFunctionMetaData();
+ private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant())
.get(functionDetails.getNamespace()).get(functionDetails.getName());
@@ -401,44 +413,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
.get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
}
- @VisibleForTesting
- CompletableFuture<RequestResult> submit(Request.ServiceRequest serviceRequest) {
- ServiceRequestInfo serviceRequestInfo = ServiceRequestInfo.of(serviceRequest);
- CompletableFuture<MessageId> messageIdCompletableFuture = this.serviceRequestManager.submitRequest(serviceRequest);
-
- serviceRequestInfo.setCompletableFutureRequestMessageId(messageIdCompletableFuture);
- CompletableFuture<RequestResult> requestResultCompletableFuture = new CompletableFuture<>();
-
- serviceRequestInfo.setRequestResultCompletableFuture(requestResultCompletableFuture);
-
- this.pendingServiceRequests.put(serviceRequestInfo.getServiceRequest().getRequestId(), serviceRequestInfo);
-
- messageIdCompletableFuture.exceptionally(ex -> {
- FunctionDetails metadata = serviceRequest.getFunctionMetaData().getFunctionDetails();
- log.warn("Failed to submit function metadata for {}/{}/{}-{}", metadata.getTenant(),
- metadata.getNamespace(), metadata.getName(), ex.getMessage());
- serviceRequestInfo.getRequestResultCompletableFuture()
- .completeExceptionally(new RuntimeException("Failed to submit function metadata"));
- return null;
- });
-
- return requestResultCompletableFuture;
- }
-
- @Override
- public void close() throws Exception {
- if (this.functionMetaDataTopicTailer != null) {
- this.functionMetaDataTopicTailer.close();
- }
- if (this.serviceRequestManager != null) {
- this.serviceRequestManager.close();
- }
- }
-
- private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
- return new ServiceRequestManager(pulsarClient.newProducer()
- .topic(functionMetadataTopic)
- .producerName(workerConfig.getWorkerId() + "-function-metadata-manager")
- .create());
+ private void initializeTailer() throws PulsarClientException {
+ this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
+ pulsarClient.newReader(), this.workerConfig, lastMessageSeen, this.errorNotifier);
+ this.functionMetaDataTopicTailer.start();
+ log.info("MetaData Manager Tailer started");
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index f0626ce..8b51971 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.worker;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -39,21 +41,20 @@ public class FunctionMetaDataTopicTailer
private final Thread readerThread;
private volatile boolean running;
private ErrorNotifier errorNotifier;
+ private volatile boolean stopOnNoMessageAvailable;
+ private CompletableFuture<Void> exitFuture = new CompletableFuture<>();
public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
ReaderBuilder readerBuilder, WorkerConfig workerConfig,
+ MessageId lastMessageSeen,
ErrorNotifier errorNotifier)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
- this.reader = readerBuilder
- .topic(workerConfig.getFunctionMetadataTopic())
- .startMessageId(MessageId.earliest)
- .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
- .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
- .create();
+ this.reader = createReader(workerConfig, readerBuilder, lastMessageSeen);
readerThread = new Thread(this);
readerThread.setName("function-metadata-tailer-thread");
this.errorNotifier = errorNotifier;
+ stopOnNoMessageAvailable = false;
}
public void start() {
@@ -63,10 +64,22 @@ public class FunctionMetaDataTopicTailer
@Override
public void run() {
- while(running) {
+ while (running) {
+ if (stopOnNoMessageAvailable) {
+ try {
+ if (!reader.hasMessageAvailable()) {
+ break;
+ }
+ } catch (PulsarClientException e) {
+ log.error("Received exception while testing hasMessageAvailable", e);
+ errorNotifier.triggerError(e);
+ }
+ }
try {
- Message<byte[]> msg = reader.readNext();
- processRequest(msg);
+ Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+ if (msg != null) {
+ this.functionMetaDataManager.processMetaDataTopicMessage(msg);
+ }
} catch (Throwable th) {
if (running) {
log.error("Encountered error in metadata tailer", th);
@@ -77,10 +90,16 @@ public class FunctionMetaDataTopicTailer
if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
log.warn("Encountered error when metadata tailer is not running", th);
}
- return;
}
}
}
+ log.info("metadata tailer thread exiting");
+ exitFuture.complete(null);
+ }
+
+ public CompletableFuture<Void> stopWhenNoMoreMessages() {
+ stopOnNoMessageAvailable = true;
+ return exitFuture;
}
@Override
@@ -88,23 +107,35 @@ public class FunctionMetaDataTopicTailer
log.info("Stopping function metadata tailer");
try {
running = false;
- if (readerThread != null && readerThread.isAlive()) {
+ while (true) {
readerThread.interrupt();
+ try {
+ readerThread.join(5000, 0);
+ } catch (InterruptedException e) {
+ log.warn("Waiting for metadata tailer thread to stop is interrupted", e);
+ }
+
+ if (readerThread.isAlive()) {
+ log.warn("metadata tailer thread is still alive. Will attempt to interrupt again.");
+ } else {
+ break;
+ }
}
- if (reader != null) {
- reader.close();
- }
+
+ reader.close();
} catch (IOException e) {
log.error("Failed to stop function metadata tailer", e);
}
log.info("Stopped function metadata tailer");
}
- public void processRequest(Message<byte[]> msg) throws IOException {
- ServiceRequest serviceRequest = ServiceRequest.parseFrom(msg.getData());
- if (log.isDebugEnabled()) {
- log.debug("Received Service Request: {}", serviceRequest);
- }
- this.functionMetaDataManager.processRequest(msg.getMessageId(), serviceRequest);
+ public static Reader createReader(WorkerConfig workerConfig, ReaderBuilder readerBuilder,
+ MessageId startMessageId) throws PulsarClientException {
+ return readerBuilder
+ .topic(workerConfig.getFunctionMetadataTopic())
+ .startMessageId(startMessageId)
+ .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
+ .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
+ .create();
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 43faccc..81a1aa1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -106,8 +106,12 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
// is the leader and may need to start computing and writing assignments
schedulerManager.initialize();
+ functionMetaDataManager.acquireLeadership();
// indicate leader initialization is complete
leaderInitComplete.set(true);
+ // Once we become leader we need to schedule
+ // This is done after leaderInitComplete because schedule waits on that becoming true
+ schedulerManager.schedule();
} catch (Throwable th) {
log.error("Encountered error when initializing to become leader", th);
errorNotifier.triggerError(th);
@@ -131,6 +135,7 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
} else {
functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced());
}
+ functionMetaDataManager.giveupLeadership();
leaderInitComplete.set(false);
} catch (Throwable th) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index ede4fc8..35dff9e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -217,6 +217,7 @@ public class WorkerService {
// initialize function runtime manager
log.info("/** Initializing Runtime Manager **/");
+
MessageId lastAssignmentMessageId = functionRuntimeManager.initialize();
// Setting references to managers in scheduler
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/RequestResult.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/RequestResult.java
deleted file mode 100644
index 7cccab1..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/RequestResult.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import com.google.gson.Gson;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.experimental.Accessors;
-
-@Accessors(chain = true)
-@Getter
-@Setter
-public class RequestResult {
- private boolean success;
- private String message;
-
- public boolean isSuccess() {
- return success;
- }
-
- public String toJson() {
- return new Gson().toJson(this);
- }
-}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestInfo.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestInfo.java
deleted file mode 100644
index 9b652d0..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestInfo.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import lombok.Data;
-import lombok.Setter;
-import lombok.experimental.Accessors;
-import org.apache.pulsar.client.api.MessageId;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-
-@Data
-@Accessors(chain = true)
-public class ServiceRequestInfo {
- private final ServiceRequest serviceRequest;
- @Setter
- private CompletableFuture<MessageId> completableFutureRequestMessageId;
- @Setter
- private CompletableFuture<RequestResult> requestResultCompletableFuture;
-
- private ServiceRequestInfo(ServiceRequest serviceRequest) {
- this.serviceRequest = serviceRequest;
- }
-
- public static ServiceRequestInfo of (ServiceRequest serviceRequest) {
- return new ServiceRequestInfo(serviceRequest);
- }
-}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestManager.java
deleted file mode 100644
index 266276b..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-
-import java.util.concurrent.CompletableFuture;
-
-@Slf4j
-public class ServiceRequestManager implements AutoCloseable {
-
- private final Producer producer;
-
- public ServiceRequestManager(Producer producer) throws PulsarClientException {
- this.producer = producer;
- }
-
- public CompletableFuture<MessageId> submitRequest(ServiceRequest serviceRequest) {
- if (log.isDebugEnabled()) {
- log.debug("Submitting Service Request: {}", serviceRequest);
- }
- return producer.sendAsync(serviceRequest.toByteArray());
- }
-
- @Override
- public void close() {
- try {
- this.producer.close();
- } catch (PulsarClientException e) {
- log.warn("Failed to close producer for service request manager", e);
- }
- }
-}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestUtils.java
deleted file mode 100644
index b31de0a..0000000
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/request/ServiceRequestUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-
-import java.util.UUID;
-
-public class ServiceRequestUtils {
- public static ServiceRequest getServiceRequest(String requestId, String workerId,
- ServiceRequest.ServiceRequestType serviceRequestType,
- FunctionMetaData functionMetaData) {
- ServiceRequest.Builder serviceRequestBuilder
- = ServiceRequest.newBuilder()
- .setRequestId(requestId)
- .setWorkerId(workerId)
- .setServiceRequestType(serviceRequestType);
- if (functionMetaData != null) {
- serviceRequestBuilder.setFunctionMetaData(functionMetaData);
- }
- return serviceRequestBuilder.build();
- }
-
- public static ServiceRequest getUpdateRequest(String workerId, FunctionMetaData functionMetaData) {
- return getServiceRequest(
- UUID.randomUUID().toString(),
- workerId,
- ServiceRequest.ServiceRequestType.UPDATE, functionMetaData);
- }
-
- public static ServiceRequest getDeregisterRequest(String workerId, FunctionMetaData functionMetaData) {
- return getServiceRequest(
- UUID.randomUUID().toString(),
- workerId,
- ServiceRequest.ServiceRequestType.DELETE, functionMetaData);
- }
-
- public static ServiceRequest getIntializationRequest(String requestId, String workerId) {
- return getServiceRequest(
- requestId,
- workerId,
- ServiceRequest.ServiceRequestType.INITIALIZE, null);
- }
-}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index f49971b..2e31f73 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -45,6 +45,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -80,7 +81,6 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import java.io.File;
@@ -94,7 +94,6 @@ import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -392,24 +391,13 @@ public abstract class ComponentImpl {
throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}
- CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
- namespace, componentName);
-
- RequestResult requestResult = null;
- try {
- requestResult = completableFuture.get();
- if (!requestResult.isSuccess()) {
- throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
- }
- } catch (ExecutionException e) {
- log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
- ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
- } catch (InterruptedException e) {
- log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
- ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
- }
+ FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.incrMetadataVersion(functionMetaData, functionMetaData);
+ internalProcessFunctionRequest(newVersionedMetaData.getFunctionDetails().getTenant(),
+ newVersionedMetaData.getFunctionDetails().getNamespace(),
+ newVersionedMetaData.getFunctionDetails().getName(),
+ newVersionedMetaData, true,
+ String.format("Error deleting {} @ /{}/{}/{}",
+ ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
// clean up component files stored in BK
if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
@@ -534,15 +522,10 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
}
- try {
- functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName,
- Integer.parseInt(instanceId), start);
- } catch (WebApplicationException we) {
- throw we;
- } catch (Exception e) {
- log.error("Failed to start/stop {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, instanceId, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
+ FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, Integer.parseInt(instanceId), start);
+ internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false,
+ String.format("Failed to start/stop {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType),
+ tenant, namespace, componentName, instanceId));
}
public void restartFunctionInstance(final String tenant,
@@ -662,14 +645,9 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
}
- try {
- functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName, -1, start);
- } catch (WebApplicationException we) {
- throw we;
- } catch (Exception e) {
- log.error("Failed to start/stop {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
+ FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, -1, start);
+ internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false,
+ String.format("Failed to start/stop {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
}
public void restartFunctionInstances(final String tenant,
@@ -880,25 +858,12 @@ public abstract class ComponentImpl {
return retVals;
}
- void updateRequest(final FunctionMetaData functionMetaData) {
-
- // Submit to FMT
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.updateFunction(functionMetaData);
-
- RequestResult requestResult = null;
- try {
- requestResult = completableFuture.get();
- if (!requestResult.isSuccess()) {
- throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
- }
- } catch (ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- } catch (InterruptedException e) {
- throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
- }
-
+ void updateRequest(FunctionMetaData existingFunctionMetaData, final FunctionMetaData functionMetaData) {
+ FunctionMetaData updatedVersionMetaData = FunctionMetaDataUtils.incrMetadataVersion(existingFunctionMetaData, functionMetaData);
+ internalProcessFunctionRequest(updatedVersionMetaData.getFunctionDetails().getTenant(),
+ updatedVersionMetaData.getFunctionDetails().getNamespace(),
+ updatedVersionMetaData.getFunctionDetails().getName(),
+ updatedVersionMetaData, false, "Update Failed");
}
public List<ConnectorDefinition> getListOfConnectors() {
@@ -1564,4 +1529,24 @@ public abstract class ComponentImpl {
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
+
+ private void internalProcessFunctionRequest(final String tenant, final String namespace, final String functionName,
+ final FunctionMetaData functionMetadata, boolean delete, String errorMsg) {
+ try {
+ if (worker().getLeaderService().isLeader()) {
+ worker().getFunctionMetaDataManager().updateFunctionOnLeader(functionMetadata, delete);
+ } else {
+ FunctionsImpl functions = (FunctionsImpl) worker().getFunctionAdmin().functions();
+ functions.updateOnWorkerLeader(tenant,
+ namespace, functionName, functionMetadata.toByteArray(), delete);
+ }
+ } catch (PulsarAdminException e) {
+ log.error(errorMsg, e);
+ throw new RestException(e.getStatusCode(), e.getMessage());
+ } catch (IllegalStateException e) {
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new RestException(Status.BAD_REQUEST, e.getMessage());
+ }
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0e4c4a0..b5c641a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.util.RestException;
@@ -45,6 +46,7 @@ import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -226,7 +228,7 @@ public class FunctionsImpl extends ComponentImpl {
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
+ updateRequest(null, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if (functionPkgUrl == null || !functionPkgUrl.startsWith(Utils.FILE)) {
@@ -417,7 +419,7 @@ public class FunctionsImpl extends ComponentImpl {
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
+ updateRequest(existingComponent, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((functionPkgUrl != null && !functionPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
@@ -647,6 +649,60 @@ public class FunctionsImpl extends ComponentImpl {
return functionStatus;
}
+ public void updateFunctionOnWorkerLeader(final String tenant,
+ final String namespace,
+ final String functionName,
+ final InputStream uploadedInputStream,
+ final boolean delete,
+ URI uri,
+ final String clientRole) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (worker().getWorkerConfig().isAuthorizationEnabled()) {
+ if (!isSuperUser(clientRole)) {
+ log.error("{}/{}/{} Client [{}] is not superuser to update on worker leader {}", tenant, namespace,
+ functionName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (functionName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Function name is not provided");
+ }
+ Function.FunctionMetaData functionMetaData;
+ try {
+ functionMetaData = Function.FunctionMetaData.parseFrom(uploadedInputStream);
+ } catch (IOException e) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Corrupt Function MetaData");
+ }
+
+ // Redirect if we are not the leader
+ if (!worker().getLeaderService().isLeader()) {
+ WorkerInfo workerInfo = worker().getMembershipManager().getLeader();
+ URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+
+ // Its possible that we are not the leader anymore. That will be taken care of by FunctionMetaDataManager
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+ try {
+ functionMetaDataManager.updateFunctionOnLeader(functionMetaData, delete);
+ } catch (IllegalStateException e) {
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+ }
+
private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
final String namespace,
final String componentName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 095d8e9..a9bc573 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -223,7 +223,7 @@ public class SinksImpl extends ComponentImpl {
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
+ updateRequest(null, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if (sinkPkgUrl == null || !sinkPkgUrl.startsWith(Utils.FILE)) {
@@ -417,7 +417,7 @@ public class SinksImpl extends ComponentImpl {
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
+ updateRequest(existingComponent, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((sinkPkgUrl != null && !sinkPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index bbe5920..187a7ae 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -223,7 +223,7 @@ public class SourcesImpl extends ComponentImpl {
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
+ updateRequest(null, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if (sourcePkgUrl == null || !sourcePkgUrl.startsWith(Utils.FILE)) {
@@ -414,7 +414,7 @@ public class SourcesImpl extends ComponentImpl {
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
+ updateRequest(existingComponent, functionMetaDataBuilder.build());
} finally {
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((sourcePkgUrl != null && !sourcePkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 8e32cda..0fe8c48 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -44,6 +44,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -366,4 +367,25 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
final @FormDataParam("state") FunctionState stateJson) throws IOException {
functions.putFunctionState(tenant, namespace, functionName, key, stateJson, clientAppId(), clientAuthData());
}
+
+ @PUT
+ @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have super-user permissions"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 307, message = "Redirecting to the worker leader"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ })
+ @Path("/leader/{tenant}/{namespace}/{functionName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
+ final @FormDataParam("delete") boolean delete) {
+
+ functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
+ delete, uri.getRequestUri(), clientAppId());
+ }
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 20f30a9..b5fb0df 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -20,26 +20,15 @@ package org.apache.pulsar.functions.worker;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -97,7 +86,7 @@ public class FunctionMetaDataManagerTest {
}
@Test
- public void updateFunction() throws PulsarClientException {
+ public void testUpdateIfLeaderFunction() throws PulsarClientException {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
@@ -106,309 +95,167 @@ public class FunctionMetaDataManagerTest {
mock(SchedulerManager.class),
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+ .setVersion(1)
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
- Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
- functionMetaDataManager.updateFunction(m1);
- verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
- @Override
- public boolean matches(Request.ServiceRequest serviceRequest) {
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId())) {
- return false;
- }
- if (!serviceRequest.getServiceRequestType().equals(Request.ServiceRequest.ServiceRequestType.UPDATE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().equals(m1)) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != 0) {
- return false;
- }
- return true;
- }
- }));
-
- // already have record
- long version = 5;
- functionMetaDataManager = spy(
- new FunctionMetaDataManager(workerConfig,
- mock(SchedulerManager.class),
- mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
- Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
- Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- functionMetaDataMap.put("func-1", m2);
- functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
- functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap);
- Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-
- functionMetaDataManager.updateFunction(m2);
- verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
- @Override
- public boolean matches(Request.ServiceRequest serviceRequest) {
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
- return false;
- if (!serviceRequest.getServiceRequestType().equals(
- Request.ServiceRequest.ServiceRequestType.UPDATE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m2.getFunctionDetails())) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
- return false;
- }
- return true;
- }
- }));
-
- }
-
- @Test
- public void testStopFunction() throws PulsarClientException {
-
- long version = 5;
- WorkerConfig workerConfig = new WorkerConfig();
- workerConfig.setWorkerId("worker-1");
- FunctionMetaDataManager functionMetaDataManager = spy(
- new FunctionMetaDataManager(workerConfig,
- mock(SchedulerManager.class),
- mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
- Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
- Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
- Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
- functionMetaDataMap1.put("func-1", f1);
-
- Assert.assertTrue(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.STOPPED));
- Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.RUNNING));
- Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.STOPPED));
- Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.RUNNING));
-
- functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
- functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1);
-
- Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-
- functionMetaDataManager.changeFunctionInstanceStatus("tenant-1", "namespace-1", "func-1", 0, false);
-
- verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).submit(argThat(serviceRequest -> {
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
- return false;
- if (!serviceRequest.getServiceRequestType().equals(
- Request.ServiceRequest.ServiceRequestType.UPDATE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(f1.getFunctionDetails())) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
- return false;
- }
- Map<Integer, Function.FunctionState> stateMap = serviceRequest.getFunctionMetaData().getInstanceStatesMap();
- if (stateMap == null || stateMap.isEmpty()) {
- return false;
- }
- if (stateMap.get(1) != Function.FunctionState.RUNNING) {
- return false;
- }
- if (stateMap.get(0) != Function.FunctionState.STOPPED) {
- return false;
- }
- return true;
- }));
+ // update when you are not the leader
+ try {
+ functionMetaDataManager.updateFunctionOnLeader(m1, false);
+ Assert.assertTrue(false);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Not the leader");
+ }
+
+ // become leader
+ functionMetaDataManager.acquireLeadership();
+ // Now w should be able to really update
+ functionMetaDataManager.updateFunctionOnLeader(m1, false);
+
+ // outdated request
+ try {
+ functionMetaDataManager.updateFunctionOnLeader(m1, false);
+ Assert.assertTrue(false);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
+ }
+ // udpate with new version
+ m1 = m1.toBuilder().setVersion(2).build();
+ functionMetaDataManager.updateFunctionOnLeader(m1, false);
}
@Test
public void deregisterFunction() throws PulsarClientException {
- long version = 5;
+ SchedulerManager mockedScheduler = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
- mock(SchedulerManager.class),
+ mockedScheduler,
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+ .setVersion(1)
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
- functionMetaDataMap.put("func-1", m1);
- functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
- functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap);
- Mockito.doReturn(null).when(functionMetaDataManager).submit(any(Request.ServiceRequest.class));
-
- functionMetaDataManager.deregisterFunction("tenant-1", "namespace-1", "func-1");
-
- verify(functionMetaDataManager, times(1)).submit(any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).submit(argThat(new ArgumentMatcher<Request.ServiceRequest>() {
- @Override
- public boolean matches(Request.ServiceRequest serviceRequest) {
- if (!serviceRequest.getWorkerId().equals(workerConfig.getWorkerId()))
- return false;
- if (!serviceRequest.getServiceRequestType().equals(
- Request.ServiceRequest.ServiceRequestType.DELETE)) {
- return false;
- }
- if (!serviceRequest.getFunctionMetaData().getFunctionDetails().equals(m1.getFunctionDetails())) {
- return false;
- }
- if (serviceRequest.getFunctionMetaData().getVersion() != (version + 1)) {
- return false;
- }
- return true;
- }
- }));
+ .setNamespace("namespace-1").setTenant("tenant-1")).build();
+
+ // Try deleting when you are not the leader
+ try {
+ functionMetaDataManager.updateFunctionOnLeader(m1, true);
+ Assert.assertTrue(false);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Not the leader");
+ }
+
+ // become leader
+ functionMetaDataManager.acquireLeadership();
+ verify(mockedScheduler, times(0)).schedule();
+ // Now try deleting
+ functionMetaDataManager.updateFunctionOnLeader(m1, true);
+ // make sure schedule was not called because function didn't exist.
+ verify(mockedScheduler, times(0)).schedule();
+
+ // insert function
+ functionMetaDataManager.updateFunctionOnLeader(m1, false);
+ verify(mockedScheduler, times(1)).schedule();
+
+ // outdated request
+ try {
+ functionMetaDataManager.updateFunctionOnLeader(m1, true);
+ Assert.assertTrue(false);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again.");
+ }
+ verify(mockedScheduler, times(1)).schedule();
+
+ // udpate with new version
+ m1 = m1.toBuilder().setVersion(2).build();
+ functionMetaDataManager.updateFunctionOnLeader(m1, true);
+ verify(mockedScheduler, times(2)).schedule();
}
@Test
- public void testProcessRequest() throws PulsarClientException {
+ public void testProcessRequest() throws PulsarClientException, IOException {
WorkerConfig workerConfig = new WorkerConfig();
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
- Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
- Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
+ doReturn(true).when(functionMetaDataManager).processUpdate(any(Function.FunctionMetaData.class));
+ doReturn(true).when(functionMetaDataManager).proccessDeregister(any(Function.FunctionMetaData.class));
Request.ServiceRequest serviceRequest
= Request.ServiceRequest.newBuilder().setServiceRequestType(
Request.ServiceRequest.ServiceRequestType.UPDATE).build();
- functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest);
+ Message msg = mock(Message.class);
+ doReturn(serviceRequest.toByteArray()).when(msg).getData();
+ functionMetaDataManager.processMetaDataTopicMessage(msg);
verify(functionMetaDataManager, times(1)).processUpdate
- (any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).processUpdate(serviceRequest);
+ (any(Function.FunctionMetaData.class));
+ verify(functionMetaDataManager).processUpdate(serviceRequest.getFunctionMetaData());
serviceRequest
= Request.ServiceRequest.newBuilder().setServiceRequestType(
Request.ServiceRequest.ServiceRequestType.INITIALIZE).build();
- functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest);
+ doReturn(serviceRequest.toByteArray()).when(msg).getData();
+ functionMetaDataManager.processMetaDataTopicMessage(msg);
serviceRequest
= Request.ServiceRequest.newBuilder().setServiceRequestType(
Request.ServiceRequest.ServiceRequestType.DELETE).build();
- functionMetaDataManager.processRequest(MessageId.earliest, serviceRequest);
+ doReturn(serviceRequest.toByteArray()).when(msg).getData();
+ functionMetaDataManager.processMetaDataTopicMessage(msg);
verify(functionMetaDataManager, times(1)).proccessDeregister(
- any(Request.ServiceRequest.class));
- verify(functionMetaDataManager).proccessDeregister(serviceRequest);
+ any(Function.FunctionMetaData.class));
+ verify(functionMetaDataManager).proccessDeregister(serviceRequest.getFunctionMetaData());
}
@Test
public void processUpdateTest() throws PulsarClientException {
- long version = 5;
+ SchedulerManager schedulerManager = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- SchedulerManager schedulerManager = mock(SchedulerManager.class);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
- // worker has no record of function
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+ .setVersion(1)
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-
- Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(m1)
- .setWorkerId("worker-1")
- .build();
- functionMetaDataManager.processUpdate(serviceRequest);
+ .setNamespace("namespace-1").setTenant("tenant-1")).build();
+
+ Assert.assertTrue(functionMetaDataManager.processUpdate(m1));
verify(functionMetaDataManager, times(1))
.setFunctionMetaData(any(Function.FunctionMetaData.class));
- verify(schedulerManager, times(1)).schedule();
+ verify(schedulerManager, times(0)).schedule();
Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
- // worker has record of function
-
- // request is oudated
- schedulerManager = mock(SchedulerManager.class);
- functionMetaDataManager = spy(
- new FunctionMetaDataManager(workerConfig,
- schedulerManager,
- mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
- Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- functionMetaDataManager.setFunctionMetaData(m3);
- Function.FunctionMetaData outdated = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version - 1).build();
-
- serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(outdated)
- .setWorkerId("worker-1")
- .build();
- functionMetaDataManager.processUpdate(serviceRequest);
-
- Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
- "tenant-1", "namespace-1", "func-1"));
+ // outdated request
+ try {
+ functionMetaDataManager.processUpdate(m1);
+ Assert.assertTrue(false);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
+ }
verify(functionMetaDataManager, times(1))
.setFunctionMetaData(any(Function.FunctionMetaData.class));
verify(schedulerManager, times(0)).schedule();
-
- Function.FunctionMetaData outdated2 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
-
- serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(outdated2)
- .setWorkerId("worker-2")
- .build();
- functionMetaDataManager.processUpdate(serviceRequest);
- Assert.assertEquals(m3, functionMetaDataManager.getFunctionMetaData(
- "tenant-1", "namespace-1", "func-1"));
- verify(functionMetaDataManager, times(1))
- .setFunctionMetaData(any(Function.FunctionMetaData.class));
- verify(schedulerManager, times(0)).schedule();
-
Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
- // schedule
- schedulerManager = mock(SchedulerManager.class);
- functionMetaDataManager = spy(
- new FunctionMetaDataManager(workerConfig,
- schedulerManager,
- mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
-
- Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- functionMetaDataManager.setFunctionMetaData(m4);
- Function.FunctionMetaData m5 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version + 1).build();
-
- serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(m5)
- .setWorkerId("worker-2")
- .build();
- functionMetaDataManager.processUpdate(serviceRequest);
-
+ // udpate with new version
+ m1 = m1.toBuilder().setVersion(2).build();
+ Assert.assertTrue(functionMetaDataManager.processUpdate(m1));
verify(functionMetaDataManager, times(2))
.setFunctionMetaData(any(Function.FunctionMetaData.class));
- verify(schedulerManager, times(1)).schedule();
-
- Assert.assertEquals(m1.toBuilder().setVersion(version + 1).build(),
- functionMetaDataManager.functionMetaDataMap.get(
+ verify(schedulerManager, times(0)).schedule();
+ Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
@@ -416,90 +263,56 @@ public class FunctionMetaDataManagerTest {
@Test
public void processDeregister() throws PulsarClientException {
- long version = 5;
+ SchedulerManager schedulerManager = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- SchedulerManager schedulerManager = mock(SchedulerManager.class);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
- // worker has no record of function
- Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
+ .setVersion(1)
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(m1)
- .setWorkerId("worker-1")
- .build();
- functionMetaDataManager.proccessDeregister(serviceRequest);
+ .setNamespace("namespace-1").setTenant("tenant-1")).build();
+ Assert.assertFalse(functionMetaDataManager.proccessDeregister(m1));
+ verify(functionMetaDataManager, times(0))
+ .setFunctionMetaData(any(Function.FunctionMetaData.class));
verify(schedulerManager, times(0)).schedule();
- Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
- "tenant-1").get("namespace-1").get("func-2"));
+ Assert.assertEquals(0, functionMetaDataManager.functionMetaDataMap.size());
+
+ // insert something
+ Assert.assertTrue(functionMetaDataManager.processUpdate(m1));
+ verify(functionMetaDataManager, times(1))
+ .setFunctionMetaData(any(Function.FunctionMetaData.class));
+ verify(schedulerManager, times(0)).schedule();
+ Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
+ "tenant-1").get("namespace-1").get("func-1"));
Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
- // function exists but request outdated
- schedulerManager = mock(SchedulerManager.class);
- functionMetaDataManager = spy(
- new FunctionMetaDataManager(workerConfig,
- schedulerManager,
- mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
- functionMetaDataManager.setFunctionMetaData(test);
- Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
- functionMetaDataManager.setFunctionMetaData(m2);
- serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(m2)
- .setWorkerId("worker-1")
- .build();
-
- functionMetaDataManager.proccessDeregister(serviceRequest);
+ // outdated delete request
+ try {
+ functionMetaDataManager.proccessDeregister(m1);
+ Assert.assertTrue(false);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again.");
+ }
+ verify(functionMetaDataManager, times(1))
+ .setFunctionMetaData(any(Function.FunctionMetaData.class));
verify(schedulerManager, times(0)).schedule();
-
- Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
- "tenant-1").get("namespace-1").get("func-2"));
- Assert.assertEquals(m2, functionMetaDataManager.functionMetaDataMap.get(
+ Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").get("func-1"));
- Assert.assertEquals(2, functionMetaDataManager.functionMetaDataMap.get(
+ Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
- // function deleted
- schedulerManager = mock(SchedulerManager.class);
- functionMetaDataManager = spy(
- new FunctionMetaDataManager(workerConfig,
- schedulerManager,
- mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
- functionMetaDataManager.setFunctionMetaData(test);
-
- Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version ).build();
- functionMetaDataManager.setFunctionMetaData(m3);
-
- Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
- .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
- .setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version +1).build();
- serviceRequest = Request.ServiceRequest.newBuilder()
- .setServiceRequestType(Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(m4)
- .setWorkerId("worker-1")
- .build();
-
- functionMetaDataManager.proccessDeregister(serviceRequest);
- verify(schedulerManager, times(1)).schedule();
-
- Assert.assertEquals(test, functionMetaDataManager.functionMetaDataMap.get(
- "tenant-1").get("namespace-1").get("func-2"));
- Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get(
+ // delete now
+ m1 = m1.toBuilder().setVersion(2).build();
+ Assert.assertTrue(functionMetaDataManager.proccessDeregister(m1));
+ verify(functionMetaDataManager, times(1))
+ .setFunctionMetaData(any(Function.FunctionMetaData.class));
+ verify(schedulerManager, times(0)).schedule();
+ Assert.assertEquals(0, functionMetaDataManager.functionMetaDataMap.get(
"tenant-1").get("namespace-1").size());
}
}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
index b84af4b..784b7ed 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -25,9 +26,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.function.Function;
+import java.util.concurrent.TimeUnit;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -35,7 +36,6 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
@@ -63,7 +63,7 @@ public class FunctionMetaDataTopicTailerTest {
when(readerBuilder.subscriptionRolePrefix(anyString())).thenReturn(readerBuilder);
when(readerBuilder.create()).thenReturn(reader);
this.fsm = mock(FunctionMetaDataManager.class);
- this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), ErrorNotifier.getDefaultImpl() );
+ this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), MessageId.earliest, ErrorNotifier.getDefaultImpl() );
}
@AfterMethod
@@ -75,13 +75,13 @@ public class FunctionMetaDataTopicTailerTest {
@Test
public void testUpdate() throws Exception {
- ServiceRequest request = ServiceRequestUtils.getUpdateRequest(TEST_NAME, FunctionMetaData.newBuilder().build());
+ FunctionMetaData request = FunctionMetaData.newBuilder().build();
Message msg = mock(Message.class);
when(msg.getData()).thenReturn(request.toByteArray());
CountDownLatch readLatch = new CountDownLatch(1);
CountDownLatch processLatch = new CountDownLatch(1);
- when(reader.readNext()).thenReturn(msg).then(new Answer<Message>() {
+ when(reader.readNext(anyInt(), any(TimeUnit.class))).thenReturn(msg).then(new Answer<Message>() {
public Message answer(InvocationOnMock invocation) {
try {
readLatch.countDown();
@@ -97,7 +97,7 @@ public class FunctionMetaDataTopicTailerTest {
readLatch.await();
- verify(reader, times(2)).readNext();
- verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
+ verify(reader, times(2)).readNext(anyInt(), any(TimeUnit.class));
+ verify(fsm, times(1)).processMetaDataTopicMessage(any(Message.class));
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
deleted file mode 100644
index 501ce1a..0000000
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker.request;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link ServiceRequestManager}.
- */
-@PrepareForTest(WorkerUtils.class)
-public class ServiceRequestManagerTest {
-
- private final Producer producer;
- private final ServiceRequestManager reqMgr;
-
- public ServiceRequestManagerTest() throws Exception {
- this.producer = mock(Producer.class);
- this.reqMgr = new ServiceRequestManager(producer);
- }
-
- @AfterMethod
- public void tearDown() throws Exception {
- reqMgr.close();
- verify(producer, times(1)).close();
- }
-
- @Test
- public void testSubmitRequest() throws Exception {
- ServiceRequest request = ServiceRequest.newBuilder().build();
- MessageId msgId = mock(MessageId.class);
-
- when(producer.sendAsync(any(byte[].class)))
- .thenReturn(CompletableFuture.completedFuture(msgId));
-
- CompletableFuture<MessageId> submitFuture = reqMgr.submitRequest(request);
- assertSame(msgId, submitFuture.get());
- verify(producer, times(1)).sendAsync(any(byte[].class));
- }
-
-}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 88142e6..2a50623 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -48,7 +49,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
@@ -56,13 +56,13 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Functions;
+import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
@@ -77,12 +77,7 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -132,6 +127,7 @@ public class FunctionApiV2ResourceTest {
private PulsarAdmin mockedPulsarAdmin;
private Tenants mockedTenants;
private Namespaces mockedNamespaces;
+ private Functions mockedFunctions;
private TenantInfo mockedTenantInfo;
private List<String> namespaceList = new LinkedList<>();
private FunctionMetaDataManager mockedManager;
@@ -142,6 +138,7 @@ public class FunctionApiV2ResourceTest {
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetadata;
+ private LeaderService mockedLeaderService;
@BeforeMethod
public void setup() throws Exception {
@@ -156,20 +153,26 @@ public class FunctionApiV2ResourceTest {
this.mockedPulsarAdmin = mock(PulsarAdmin.class);
this.mockedTenants = mock(Tenants.class);
this.mockedNamespaces = mock(Namespaces.class);
+ this.mockedFunctions = mock(Functions.class);
+ this.mockedLeaderService = mock(LeaderService.class);
this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
namespaceList.add(tenant + "/" + namespace);
this.mockedWorkerService = mock(WorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+ when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
when(mockedWorkerService.isInitialized()).thenReturn(true);
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+ when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+ when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+ when(mockedLeaderService.isLeader()).thenReturn(true);
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata);
// worker config
@@ -495,11 +498,6 @@ public class FunctionApiV2ResourceTest {
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -584,12 +582,6 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -631,12 +623,8 @@ public class FunctionApiV2ResourceTest {
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("function failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
registerDefaultFunction();
} catch (RestException re) {
@@ -645,7 +633,7 @@ public class FunctionApiV2ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testRegisterFunctionInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -658,10 +646,8 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
+ doThrow(new IllegalStateException("Function registration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
@@ -674,7 +660,7 @@ public class FunctionApiV2ResourceTest {
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
- public void testUpdateFunctionMissingTenant() {
+ public void testUpdateFunctionMissingTenant() throws Exception {
try {
testUpdateFunctionMissingArguments(
null,
@@ -695,7 +681,7 @@ public class FunctionApiV2ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
- public void testUpdateFunctionMissingNamespace() {
+ public void testUpdateFunctionMissingNamespace() throws Exception {
try {
testUpdateFunctionMissingArguments(
tenant,
@@ -716,7 +702,7 @@ public class FunctionApiV2ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
- public void testUpdateFunctionMissingFunctionName() {
+ public void testUpdateFunctionMissingFunctionName() throws Exception {
try {
testUpdateFunctionMissingArguments(
tenant,
@@ -899,7 +885,7 @@ public class FunctionApiV2ResourceTest {
String outputSerdeClassName,
String className,
Integer parallelism,
- String expectedError) {
+ String expectedError) throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
FunctionConfig functionConfig = new FunctionConfig();
@@ -929,12 +915,9 @@ public class FunctionApiV2ResourceTest {
}
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
- if (expectedError == null) {
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ if (expectedError != null) {
+ doThrow(new IllegalArgumentException(expectedError))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
}
try {
@@ -1023,12 +1006,6 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
updateDefaultFunction();
}
@@ -1053,11 +1030,6 @@ public class FunctionApiV2ResourceTest {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
try {
resource.updateFunction(
@@ -1088,11 +1060,8 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("function failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
updateDefaultFunction();
} catch (RestException re) {
@@ -1101,7 +1070,7 @@ public class FunctionApiV2ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registeration interrupted")
public void testUpdateFunctionInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -1116,9 +1085,8 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registeration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
updateDefaultFunction();
} catch (RestException re) {
@@ -1209,25 +1177,16 @@ public class FunctionApiV2ResourceTest {
public void testDeregisterFunctionSuccess() {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function deregistered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
-
deregisterDefaultFunction();
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
- public void testDeregisterFunctionFailure() {
+ public void testDeregisterFunctionFailure() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to deregister");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("function failed to deregister"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
deregisterDefaultFunction();
} catch (RestException re) {
@@ -1237,13 +1196,12 @@ public class FunctionApiV2ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
- public void testDeregisterFunctionInterrupted() {
+ public void testDeregisterFunctionInterrupted() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function deregisteration interrupted"));
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function deregisteration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
deregisterDefaultFunction();
}
@@ -1476,10 +1434,6 @@ public class FunctionApiV2ResourceTest {
String filePackageUrl = "file://" + fileLocation;
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
@@ -1514,10 +1468,6 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
- RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index f1128ef..395951a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -44,7 +44,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.ws.rs.core.Response;
@@ -53,9 +52,10 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Functions;
+import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -74,15 +74,11 @@ import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -136,6 +132,7 @@ public class FunctionApiV3ResourceTest {
private PulsarAdmin mockedPulsarAdmin;
private Tenants mockedTenants;
private Namespaces mockedNamespaces;
+ private Functions mockedFunctions;
private TenantInfo mockedTenantInfo;
private List<String> namespaceList = new LinkedList<>();
private FunctionMetaDataManager mockedManager;
@@ -146,6 +143,7 @@ public class FunctionApiV3ResourceTest {
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetadata;
+ private LeaderService mockedLeaderService;
@BeforeMethod
public void setup() throws Exception {
@@ -160,20 +158,26 @@ public class FunctionApiV3ResourceTest {
this.mockedPulsarAdmin = mock(PulsarAdmin.class);
this.mockedTenants = mock(Tenants.class);
this.mockedNamespaces = mock(Namespaces.class);
+ this.mockedFunctions = mock(Functions.class);
+ this.mockedLeaderService = mock(LeaderService.class);
this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
namespaceList.add(tenant + "/" + namespace);
this.mockedWorkerService = mock(WorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+ when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
when(mockedWorkerService.isInitialized()).thenReturn(true);
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+ when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+ when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+ when(mockedLeaderService.isLeader()).thenReturn(true);
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata);
// worker config
@@ -609,12 +613,6 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -658,11 +656,8 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("function failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
registerDefaultFunction();
} catch (RestException re) {
@@ -671,7 +666,7 @@ public class FunctionApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testRegisterFunctionInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -685,9 +680,8 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
registerDefaultFunction();
} catch (RestException re) {
@@ -701,7 +695,7 @@ public class FunctionApiV3ResourceTest {
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
- public void testUpdateFunctionMissingTenant() {
+ public void testUpdateFunctionMissingTenant() throws Exception {
try {
testUpdateFunctionMissingArguments(
null,
@@ -722,7 +716,7 @@ public class FunctionApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
- public void testUpdateFunctionMissingNamespace() {
+ public void testUpdateFunctionMissingNamespace() throws Exception {
try {
testUpdateFunctionMissingArguments(
tenant,
@@ -743,7 +737,7 @@ public class FunctionApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
- public void testUpdateFunctionMissingFunctionName() {
+ public void testUpdateFunctionMissingFunctionName() throws Exception {
try {
testUpdateFunctionMissingArguments(
tenant,
@@ -926,7 +920,7 @@ public class FunctionApiV3ResourceTest {
String outputSerdeClassName,
String className,
Integer parallelism,
- String expectedError) {
+ String expectedError) throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
FunctionConfig functionConfig = new FunctionConfig();
@@ -956,12 +950,9 @@ public class FunctionApiV3ResourceTest {
}
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
- if (expectedError == null) {
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ if (expectedError != null) {
+ doThrow(new IllegalArgumentException(expectedError))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
}
resource.updateFunction(
@@ -1042,12 +1033,6 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
updateDefaultFunction();
}
@@ -1070,11 +1055,6 @@ public class FunctionApiV3ResourceTest {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
resource.updateFunction(
tenant,
@@ -1101,11 +1081,8 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("function failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
updateDefaultFunction();
} catch (RestException re) {
@@ -1114,7 +1091,7 @@ public class FunctionApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registeration interrupted")
public void testUpdateFunctionInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -1127,9 +1104,8 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registeration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
updateDefaultFunction();
} catch (RestException re) {
@@ -1220,25 +1196,16 @@ public class FunctionApiV3ResourceTest {
public void testDeregisterFunctionSuccess() {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function deregistered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
-
deregisterDefaultFunction();
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
- public void testDeregisterFunctionFailure() {
+ public void testDeregisterFunctionFailure() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to deregister");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("function failed to deregister"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
deregisterDefaultFunction();
} catch (RestException re) {
@@ -1248,13 +1215,12 @@ public class FunctionApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
- public void testDeregisterFunctionInterrupted() {
+ public void testDeregisterFunctionInterrupted() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function deregisteration interrupted"));
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function deregisteration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
deregisterDefaultFunction();
}
@@ -1520,10 +1486,6 @@ public class FunctionApiV3ResourceTest {
String filePackageUrl = "file://" + fileLocation;
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
@@ -1553,10 +1515,6 @@ public class FunctionApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
- RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
@@ -1580,10 +1538,6 @@ public class FunctionApiV3ResourceTest {
String filePackageUrl = "file://" + fileLocation;
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
- RequestResult rr = new RequestResult().setSuccess(true).setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index a93077c..6b55106 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -31,7 +32,6 @@ import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -42,15 +42,11 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
import org.apache.pulsar.io.cassandra.CassandraStringSink;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -69,7 +65,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
@@ -117,6 +112,7 @@ public class SinkApiV3ResourceTest {
private PulsarAdmin mockedPulsarAdmin;
private Tenants mockedTenants;
private Namespaces mockedNamespaces;
+ private Functions mockedFunctions;
private TenantInfo mockedTenantInfo;
private List<String> namespaceList = new LinkedList<>();
private FunctionMetaDataManager mockedManager;
@@ -127,6 +123,7 @@ public class SinkApiV3ResourceTest {
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetaData;
+ private LeaderService mockedLeaderService;
@BeforeMethod
public void setup() throws Exception {
@@ -141,19 +138,25 @@ public class SinkApiV3ResourceTest {
this.mockedPulsarAdmin = mock(PulsarAdmin.class);
this.mockedTenants = mock(Tenants.class);
this.mockedNamespaces = mock(Namespaces.class);
+ this.mockedFunctions = mock(Functions.class);
+ this.mockedLeaderService = mock(LeaderService.class);
namespaceList.add(tenant + "/" + namespace);
this.mockedWorkerService = mock(WorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+ when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
when(mockedWorkerService.isInitialized()).thenReturn(true);
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+ when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+ when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+ when(mockedLeaderService.isLeader()).thenReturn(true);
URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
if (file == null) {
@@ -162,7 +165,6 @@ public class SinkApiV3ResourceTest {
JAR_FILE_PATH = file.getFile();
INVALID_JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(INVALID_JAR_FILE_NAME).getFile();
-
// worker config
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
@@ -535,12 +537,6 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("sink registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
registerDefaultSink();
}
@@ -563,12 +559,6 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant(tenant);
sinkConfig.setNamespace(namespace);
@@ -601,11 +591,8 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("sink failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("sink failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
registerDefaultSink();
} catch (RestException re){
@@ -614,7 +601,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testRegisterSinkInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -628,9 +615,8 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
registerDefaultSink();
} catch (RestException re){
@@ -851,12 +837,9 @@ public class SinkApiV3ResourceTest {
sinkConfig.setParallelism(parallelism);
}
- if (expectedError == null) {
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ if (expectedError != null) {
+ doThrow(new IllegalArgumentException(expectedError))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
}
resource.updateSink(
@@ -957,12 +940,6 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
updateDefaultSink();
}
@@ -1001,12 +978,6 @@ public class SinkApiV3ResourceTest {
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
resource.updateSink(
tenant,
namespace,
@@ -1031,11 +1002,8 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("sink failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("sink failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
updateDefaultSink();
} catch (RestException re) {
@@ -1044,7 +1012,7 @@ public class SinkApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testUpdateSinkInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -1057,9 +1025,8 @@ public class SinkApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
updateDefaultSink();
} catch (RestException re) {
@@ -1136,7 +1103,7 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
- public void testDeregisterNotExistedSink() {
+ public void testDeregisterNotExistedSink() {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
deregisterDefaultSink();
@@ -1147,28 +1114,19 @@ public class SinkApiV3ResourceTest {
}
@Test
- public void testDeregisterSinkSuccess() {
+ public void testDeregisterSinkSuccess() throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
-
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source deregistered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to deregister")
- public void testDeregisterSinkFailure() {
+ public void testDeregisterSinkFailure() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(FunctionMetaData.newBuilder().build());
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("sink failed to deregister");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("sink failed to deregister"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
deregisterDefaultSink();
} catch (RestException re) {
@@ -1178,15 +1136,14 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregistration interrupted")
- public void testDeregisterSinkInterrupted() {
+ public void testDeregisterSinkInterrupted() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(FunctionMetaData.newBuilder().build());
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function deregistration interrupted"));
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function deregistration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
deregisterDefaultSink();
} catch (RestException re) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index d36c9a9..fe9a117 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -39,13 +39,13 @@ import java.io.InputStream;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import javax.ws.rs.core.Response;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -54,7 +54,6 @@ import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -69,15 +68,11 @@ import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.WorkerConfig;
-import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.WorkerUtils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.*;
import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
import org.apache.pulsar.io.twitter.TwitterFireHose;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -114,6 +109,7 @@ public class SourceApiV3ResourceTest {
private PulsarAdmin mockedPulsarAdmin;
private Tenants mockedTenants;
private Namespaces mockedNamespaces;
+ private Functions mockedFunctions;
private TenantInfo mockedTenantInfo;
private List<String> namespaceList = new LinkedList<>();
private FunctionMetaDataManager mockedManager;
@@ -124,6 +120,7 @@ public class SourceApiV3ResourceTest {
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetaData;
+ private LeaderService mockedLeaderService;
@BeforeMethod
public void setup() throws Exception {
@@ -138,19 +135,25 @@ public class SourceApiV3ResourceTest {
this.mockedPulsarAdmin = mock(PulsarAdmin.class);
this.mockedTenants = mock(Tenants.class);
this.mockedNamespaces = mock(Namespaces.class);
+ this.mockedFunctions = mock(Functions.class);
+ this.mockedLeaderService = mock(LeaderService.class);
namespaceList.add(tenant + "/" + namespace);
this.mockedWorkerService = mock(WorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+ when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
when(mockedWorkerService.isInitialized()).thenReturn(true);
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+ when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+ when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+ when(mockedLeaderService.isLeader()).thenReturn(true);
URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
if (file == null) {
@@ -503,12 +506,6 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
registerDefaultSource();
}
@@ -530,12 +527,6 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setTenant(tenant);
sourceConfig.setNamespace(namespace);
@@ -569,11 +560,8 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("source failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("source failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
registerDefaultSource();
} catch (RestException re){
@@ -582,7 +570,7 @@ public class SourceApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testRegisterSourceInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -596,9 +584,8 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
registerDefaultSource();
} catch (RestException re){
@@ -870,12 +857,9 @@ public class SourceApiV3ResourceTest {
sourceConfig.setParallelism(parallelism);
}
- if (expectedError == null) {
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ if (expectedError != null) {
+ doThrow(new IllegalArgumentException(expectedError))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
}
resource.updateSource(
@@ -972,12 +956,6 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
updateDefaultSource();
}
@@ -996,6 +974,7 @@ public class SourceApiV3ResourceTest {
sourceConfig.setClassName(className);
sourceConfig.setParallelism(parallelism);
+ when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
mockStatic(ConnectorUtils.class);
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
@@ -1013,14 +992,6 @@ public class SourceApiV3ResourceTest {
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
-
- when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
resource.updateSource(
tenant,
namespace,
@@ -1047,11 +1018,8 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("source failed to register");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("source failed to register"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
updateDefaultSource();
} catch (RestException re){
@@ -1060,7 +1028,7 @@ public class SourceApiV3ResourceTest {
}
}
- @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
+ @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testUpdateSourceInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
@@ -1074,9 +1042,8 @@ public class SourceApiV3ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registration interrupted"));
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function registration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
updateDefaultSource();
} catch (RestException re){
@@ -1169,27 +1136,18 @@ public class SourceApiV3ResourceTest {
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(FunctionMetaData.newBuilder().build());
- RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("source deregistered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
-
deregisterDefaultSource();
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to deregister")
- public void testDeregisterSourceFailure() {
+ public void testDeregisterSourceFailure() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(FunctionMetaData.newBuilder().build());
- RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("source failed to deregister");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+ doThrow(new IllegalArgumentException("source failed to deregister"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
deregisterDefaultSource();
} catch (RestException re){
@@ -1199,15 +1157,14 @@ public class SourceApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregistration interrupted")
- public void testDeregisterSourceInterrupted() {
+ public void testDeregisterSourceInterrupted() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(FunctionMetaData.newBuilder().build());
- CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function deregistration interrupted"));
- when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+ doThrow(new IllegalStateException("Function deregistration interrupted"))
+ .when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), Mockito.anyBoolean());
deregisterDefaultSource();
} catch (RestException re){