You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/23 05:12:31 UTC
[pulsar] branch master updated: [PulsarAdmin] Functions to async
(#6580)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 75d43eb [PulsarAdmin] Functions to async (#6580)
75d43eb is described below
commit 75d43ebb8368920a3a7b3c51d26f1bb59c66da28
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Mar 23 13:12:16 2020 +0800
[PulsarAdmin] Functions to async (#6580)
---
.../org/apache/pulsar/client/admin/Functions.java | 375 ++++++++++-
.../client/admin/internal/FunctionsImpl.java | 737 ++++++++++++++++-----
2 files changed, 942 insertions(+), 170 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index d6a7ff3..50b7bb5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -54,6 +55,20 @@ public interface Functions {
List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException;
/**
+ * Get the list of functions asynchronously.
+ * <p>
+ * Get the list of all the Pulsar functions.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["f1", "f2", "f3"]</code>
+ * </pre>
+ *
+ */
+ CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace);
+
+ /**
* Get the configuration for the specified function.
* <p>
* Response Example:
@@ -81,6 +96,27 @@ public interface Functions {
FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
/**
+ * Get the configuration for the specified function asynchronously.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @return the function configuration
+ *
+ */
+ CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function);
+
+ /**
* Create a new function.
*
* @param functionConfig
@@ -92,12 +128,20 @@ public interface Functions {
void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;
/**
- * <pre>
+ * Create a new function asynchronously.
+ *
+ * @param functionConfig
+ * the function configuration object
+ */
+ CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig, String fileName);
+
+ /**
+ * Create a new function with package url.
+ * <p>
* Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
- * </pre>
*
* @param functionConfig
* the function configuration object
@@ -108,6 +152,21 @@ public interface Functions {
void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;
/**
+ * Create a new function with package url asynchronously.
+ * <p>
+ * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ *
+ * @param functionConfig
+ * the function configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ */
+ CompletableFuture<Void> createFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl);
+
+ /**
* Update the configuration for a function.
* <p>
*
@@ -124,6 +183,15 @@ public interface Functions {
void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;
/**
+ * Update the configuration for a function asynchronously.
+ * <p>
+ *
+ * @param functionConfig
+ * the function configuration object
+ */
+ CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName);
+
+ /**
* Update the configuration for a function.
* <p>
*
@@ -141,13 +209,23 @@ public interface Functions {
void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException;
/**
+ * Update the configuration for a function asynchronously.
+ * <p>
+ *
+ * @param functionConfig
+ * the function configuration object
+ * @param updateOptions
+ * options for the update operations
+ */
+ CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions);
+
+ /**
* Update the configuration for a function.
- * <pre>
+ * <p>
* Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
- * </pre>
*
* @param functionConfig
* the function configuration object
@@ -163,13 +241,27 @@ public interface Functions {
void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;
/**
+ * Update the configuration for a function asynchronously.
+ * <p>
+ * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ *
+ * @param functionConfig
+ * the function configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ */
+ CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl);
+
+ /**
* Update the configuration for a function.
- * <pre>
+ * <p>
* Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
* eg:
* File: file:/dir/fileName.jar
* Http: http://www.repo.com/fileName.jar
- * </pre>
*
* @param functionConfig
* the function configuration object
@@ -186,9 +278,25 @@ public interface Functions {
*/
void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException;
+ /**
+ * Update the configuration for a function asynchronously.
+ * <p>
+ * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ *
+ * @param functionConfig
+ * the function configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ * @param updateOptions
+ * options for the update operations
+ */
+ CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions);
/**
- * Delete an existing function
+ * Delete an existing function.
* <p>
* Delete a function
*
@@ -211,6 +319,20 @@ public interface Functions {
void deleteFunction(String tenant, String namespace, String function) throws PulsarAdminException;
/**
+ * Delete an existing function asynchronously.
+ * <p>
+ * Delete a function
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ */
+ CompletableFuture<Void> deleteFunctionAsync(String tenant, String namespace, String function);
+
+ /**
* Gets the current status of a function.
*
* @param tenant
@@ -226,6 +348,18 @@ public interface Functions {
FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;
/**
+ * Gets the current status of a function asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ */
+ CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function);
+
+ /**
* Gets the current status of a function instance.
*
* @param tenant
@@ -243,6 +377,21 @@ public interface Functions {
throws PulsarAdminException;
/**
+ * Gets the current status of a function instance asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param id
+ * Function instance-id
+ * @return
+ */
+ CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> getFunctionStatusAsync(String tenant, String namespace, String function, int id);
+
+ /**
* Gets the current stats of a function instance.
*
* @param tenant
@@ -260,6 +409,21 @@ public interface Functions {
throws PulsarAdminException;
/**
+ * Gets the current stats of a function instance asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param id
+ * Function instance-id
+ * @return
+ */
+ CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> getFunctionStatsAsync(String tenant, String namespace, String function, int id);
+
+ /**
* Gets the current stats of a function.
*
* @param tenant
@@ -276,7 +440,21 @@ public interface Functions {
throws PulsarAdminException;
/**
- * Restart function instance
+ * Gets the current stats of a function asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @return
+ */
+
+ CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant, String namespace, String function);
+
+ /**
+ * Restart function instance.
*
* @param tenant
* Tenant name
@@ -294,7 +472,22 @@ public interface Functions {
void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
/**
- * Restart all function instances
+ * Restart function instance asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @param instanceId
+ * Function instanceId
+ */
+ CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function, int instanceId);
+
+ /**
+ * Restart all function instances.
*
* @param tenant
* Tenant name
@@ -308,9 +501,20 @@ public interface Functions {
*/
void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;
+ /**
+ * Restart all function instances asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ */
+ CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function);
/**
- * Stop function instance
+ * Stop function instance.
*
* @param tenant
* Tenant name
@@ -328,7 +532,22 @@ public interface Functions {
void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
/**
- * Start all function instances
+ * Stop function instance asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @param instanceId
+ * Function instanceId
+ */
+ CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function, int instanceId);
+
+ /**
+ * Start all function instances.
*
* @param tenant
* Tenant name
@@ -343,7 +562,19 @@ public interface Functions {
void startFunction(String tenant, String namespace, String function) throws PulsarAdminException;
/**
- * Start function instance
+ * Start all function instances asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ */
+ CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function);
+
+ /**
+ * Start function instance.
*
* @param tenant
* Tenant name
@@ -361,7 +592,22 @@ public interface Functions {
void startFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
/**
- * Stop all function instances
+ * Start function instance asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ *
+ * @param instanceId
+ * Function instanceId
+ */
+ CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function, int instanceId);
+
+ /**
+ * Stop all function instances.
*
* @param tenant
* Tenant name
@@ -375,6 +621,17 @@ public interface Functions {
*/
void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;
+ /**
+ * Stop all function instances asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ */
+ CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function);
/**
* Triggers the function by writing to the input topic.
@@ -396,6 +653,22 @@ public interface Functions {
String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException;
/**
+ * Triggers the function by writing to the input topic asynchronously.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param triggerValue
+ * The input that will be written to input topic
+ * @param triggerFile
+ * The file which contains the input that will be written to input topic
+ */
+ CompletableFuture<String> triggerFunctionAsync(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile);
+
+ /**
* Upload Data.
*
* @param sourceFile
@@ -409,6 +682,16 @@ public interface Functions {
void uploadFunction(String sourceFile, String path) throws PulsarAdminException;
/**
+ * Upload Data asynchronously.
+ *
+ * @param sourceFile
+ * dataFile that needs to be uploaded
+ * @param path
+ * Path where data should be stored
+ */
+ CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String path);
+
+ /**
* Download Function Code.
*
* @param destinationFile
@@ -425,6 +708,16 @@ public interface Functions {
* Download Function Code.
*
* @param destinationFile
+ * file where data should be downloaded to
+ * @param path
+ * Path where data is located
+ */
+ CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String path);
+
+ /**
+ * Download Function Code.
+ *
+ * @param destinationFile
* file where data should be downloaded to
* @param tenant
* Tenant name
@@ -437,6 +730,20 @@ public interface Functions {
void downloadFunction(String destinationFile, String tenant, String namespace, String function) throws PulsarAdminException;
/**
+ * Download Function Code asynchronously.
+ *
+ * @param destinationFile
+ * file where data should be downloaded to
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ */
+ CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String tenant, String namespace, String function);
+
+ /**
* Deprecated in favor of getting sources and sinks for their own APIs
*
* Fetches a list of supported Pulsar IO connectors currently running in cluster mode
@@ -502,6 +809,28 @@ public interface Functions {
FunctionState getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException;
/**
+ * Fetch the current state associated with a Pulsar Function asynchronously.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ "value : 12, version : 2"}</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param key
+ * Key name of State
+ *
+ * @return the function configuration
+ */
+ CompletableFuture<FunctionState> getFunctionStateAsync(String tenant, String namespace, String function, String key);
+
+ /**
* Puts the given state associated with a Pulsar Function.
* <p>
* Response Example:
@@ -527,4 +856,24 @@ public interface Functions {
* Unexpected error
*/
void putFunctionState(String tenant, String namespace, String function, FunctionState state) throws PulsarAdminException;
+
+ /**
+ * Puts the given state associated with a Pulsar Function asynchronously.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ "value : 12, version : 2"}</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param state
+ * FunctionState
+ */
+ CompletableFuture<Void> putFunctionStateAsync(String tenant, String namespace, String function, FunctionState state);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index bc2982d..a5b323c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.admin.internal;
import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
import io.netty.handler.codec.http.HttpHeaders;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -47,6 +46,7 @@ import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
@@ -56,7 +56,10 @@ import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.asynchttpclient.Dsl.get;
@@ -78,188 +81,408 @@ public class FunctionsImpl extends ComponentResource implements Functions {
@Override
public List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException {
try {
- Response response = request(functions.path(tenant).path(namespace)).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(new GenericType<List<String>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionsAsync(tenant, namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace) {
+ WebTarget path = functions.path(tenant).path(namespace);
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ List<String> functions
+ = response.readEntity(new GenericType<List<String>>() {});
+ future.complete(functions);
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
try {
- Response response = request(functions.path(tenant).path(namespace).path(function)).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(FunctionConfig.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function);
+ final CompletableFuture<FunctionConfig> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(FunctionConfig.class));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public FunctionStatus getFunctionStatus(
String tenant, String namespace, String function) throws PulsarAdminException {
try {
- Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(FunctionStatus.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionStatusAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("status");
+ final CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(FunctionStatus.class));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionStatus(
String tenant, String namespace, String function, int id) throws PulsarAdminException {
try {
- Response response = request(
- functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"))
- .get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionStatusAsync(tenant, namespace, function, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> getFunctionStatusAsync(String tenant, String namespace, String function, int id) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status");
+ final CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) throws PulsarAdminException {
try {
- Response response = request(
- functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionStatsAsync(tenant, namespace, function, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> getFunctionStatsAsync(String tenant, String namespace, String function, int id) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats");
+ final CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(
+ FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public FunctionStats getFunctionStats(String tenant, String namespace, String function) throws PulsarAdminException {
try {
- Response response = request(
- functions.path(tenant).path(namespace).path(function).path("stats")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(FunctionStats.class);
- } catch (Exception e) {
- throw getApiException(e);
- } }
+ return getFunctionStatsAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("stats");
+ final CompletableFuture<FunctionStats> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(FunctionStats.class));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
@Override
public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
+ createFunctionAsync(functionConfig, fileName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig, String fileName) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
RequestBuilder builder = post(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
.addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
- }
- org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
-
- if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
- throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
+ asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> {
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+ } else {
+ future.complete(null);
+ }
+ });
} catch (Exception e) {
- throw getApiException(e);
+ future.completeExceptionally(e);
}
+ return future;
}
@Override
public void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ createFunctionWithUrlAsync(functionConfig, pkgUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
- mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+ @Override
+ public CompletableFuture<Void> createFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl) {
+ WebTarget path = functions.path(functionConfig.getTenant())
+ .path(functionConfig.getNamespace()).path(functionConfig.getName());
- mp.bodyPart(new FormDataBodyPart("functionConfig",
- new Gson().toJson(functionConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
- .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ final FormDataMultiPart mp = new FormDataMultiPart();
+ mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+ mp.bodyPart(new FormDataBodyPart(
+ "functionConfig", new Gson().toJson(functionConfig), MediaType.APPLICATION_JSON_TYPE));
+
+ return asyncPostRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA));
}
@Override
public void deleteFunction(String cluster, String namespace, String function) throws PulsarAdminException {
try {
- request(functions.path(cluster).path(namespace).path(function))
- .delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteFunctionAsync(cluster, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> deleteFunctionAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function);
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
updateFunction(functionConfig, fileName, null);
}
- @Override
+ @Override
+ public CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName) {
+ return updateFunctionAsync(functionConfig, fileName, null);
+ }
+
+ @Override
public void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException {
try {
+ updateFunctionAsync(functionConfig, fileName, updateOptions)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
RequestBuilder builder = put(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
.addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
if (updateOptions != null) {
- builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON));
+ builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON));
}
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
- org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
- if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
- throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
- }
+ asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> {
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+ } else {
+ future.complete(null);
+ }
+ });
} catch (Exception e) {
- throw getApiException(e);
+ future.completeExceptionally(getApiException(e));
}
+ return future;
}
@Override
public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ updateFunctionWithUrlAsync(functionConfig, pkgUrl, updateOptions)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+ @Override
+ public CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
-
mp.bodyPart(new FormDataBodyPart(
"functionConfig",
ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig),
MediaType.APPLICATION_JSON_TYPE));
-
if (updateOptions != null) {
mp.bodyPart(new FormDataBodyPart(
"updateOptions",
ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions),
MediaType.APPLICATION_JSON_TYPE));
}
-
- request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace())
- .path(functionConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
- ErrorData.class);
+ WebTarget path = functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace())
+ .path(functionConfig.getName());
+ return asyncPutRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA));
} catch (Exception e) {
- throw getApiException(e);
+ future.completeExceptionally(getApiException(e));
}
+ return future;
}
@Override
@@ -268,104 +491,229 @@ public class FunctionsImpl extends ComponentResource implements Functions {
}
@Override
+ public CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl) {
+ return updateFunctionWithUrlAsync(functionConfig, pkgUrl, null);
+ }
+
+ @Override
public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
- if (triggerFile != null) {
- mp.bodyPart(new FileDataBodyPart("dataStream",
- new File(triggerFile),
- MediaType.APPLICATION_OCTET_STREAM_TYPE));
- }
- if (triggerValue != null) {
- mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
- }
- if (topic != null && !topic.isEmpty()) {
- mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
- }
- return request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
- .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return triggerFunctionAsync(tenant, namespace, functionName, topic, triggerValue, triggerFile)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<String> triggerFunctionAsync(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+ if (triggerFile != null) {
+ mp.bodyPart(new FileDataBodyPart("dataStream",
+ new File(triggerFile),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ }
+ if (triggerValue != null) {
+ mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
+ }
+ if (topic != null && !topic.isEmpty()) {
+ mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
+ }
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("trigger");
+
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ try {
+ request(path).async().post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), new InvocationCallback<String>() {
+
+ @Override
+ public void completed(String response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ log.warn("[{}] Failed to perform http post request: {}", path.getUri(), throwable.getMessage());
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+
+ });
+ } catch (PulsarAdminException cae) {
+ future.completeExceptionally(cae);
}
+ return future;
}
@Override
public void restartFunction(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
- request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
- .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ restartFunctionAsync(tenant, namespace, functionName, instanceId)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function, int instanceId) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId))
+ .path("restart");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void restartFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
- request(functions.path(tenant).path(namespace).path(functionName).path("restart"))
- .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ restartFunctionAsync(tenant, namespace, functionName)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("restart");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void stopFunction(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
- request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
- .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ stopFunctionAsync(tenant, namespace, functionName, instanceId)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function, int instanceId) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId))
+ .path("stop");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void stopFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
- request(functions.path(tenant).path(namespace).path(functionName).path("stop"))
- .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ stopFunctionAsync(tenant, namespace, functionName)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("stop");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void startFunction(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
- request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
- .path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ startFunctionAsync(tenant, namespace, functionName, instanceId)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function, int instanceId) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId))
+ .path("start");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void startFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
- request(functions.path(tenant).path(namespace).path(functionName).path("start"))
- .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ startFunctionAsync(tenant, namespace, functionName)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("start");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
try {
+ uploadFunctionAsync(sourceFile, path).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String path) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
RequestBuilder builder = post(functions.path("upload").getUri().toASCIIString())
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
- org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
- if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
- throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
- }
+ asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
+ .thenAccept(response -> {
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+ } else {
+ future.complete(null);
+ }
+ });
} catch (Exception e) {
- throw getApiException(e);
+ future.completeExceptionally(getApiException(e));
}
+ return future;
}
@Override
@@ -374,12 +722,35 @@ public class FunctionsImpl extends ComponentResource implements Functions {
}
@Override
+ public CompletableFuture<Void> downloadFunctionAsync(String destinationPath, String tenant, String namespace, String functionName) {
+ return downloadFileAsync(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download"));
+ }
+
+ @Override
public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
downloadFile(destinationPath, functions.path("download").queryParam("path", path));
}
+ @Override
+ public CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String path) {
+ return downloadFileAsync(destinationFile, functions.path("download").queryParam("path", path));
+ }
+
private void downloadFile(String destinationPath, WebTarget target) throws PulsarAdminException {
- HttpResponseStatus status;
+ try {
+ downloadFileAsync(destinationPath, target).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTarget target) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
try {
File file = new File(destinationPath);
if (!file.exists()) {
@@ -389,7 +760,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
RequestBuilder builder = get(target.getUri().toASCIIString());
- Future<HttpResponseStatus> whenStatusCode
+ CompletableFuture<HttpResponseStatus> statusFuture
= asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), new AsyncHandler<HttpResponseStatus>() {
private HttpResponseStatus status;
@@ -409,7 +780,6 @@ public class FunctionsImpl extends ComponentResource implements Functions {
@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
-
os.write(bodyPart.getBodyByteBuffer());
return State.CONTINUE;
}
@@ -422,17 +792,27 @@ public class FunctionsImpl extends ComponentResource implements Functions {
@Override
public void onThrowable(Throwable t) {
}
- });
-
- status = whenStatusCode.get();
- os.close();
+ }).toCompletableFuture();
+
+ statusFuture.thenAccept(status -> {
+ try {
+ os.close();
+ } catch (Exception e) {
+ future.completeExceptionally(getApiException(e));
+ return;
+ }
- if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
- throw getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build());
- }
+ if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+ future.completeExceptionally(
+ getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build()));
+ } else {
+ future.complete(null);
+ }
+ });
} catch (Exception e) {
- throw getApiException(e);
+ future.completeExceptionally(getApiException(e));
}
+ return future;
}
@Override
@@ -474,30 +854,73 @@ public class FunctionsImpl extends ComponentResource implements Functions {
public FunctionState getFunctionState(String tenant, String namespace, String function, String key)
throws PulsarAdminException {
try {
- Response response = request(functions.path(tenant)
- .path(namespace).path(function).path("state").path(key)).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw getApiException(response);
- }
- return response.readEntity(FunctionState.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionStateAsync(tenant, namespace, function, key)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<FunctionState> getFunctionStateAsync(String tenant, String namespace, String function, String key) {
+ WebTarget path = functions.path(tenant).path(namespace).path(function).path("state").path(key);
+ final CompletableFuture<FunctionState> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(FunctionState.class));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void putFunctionState(String tenant, String namespace, String function, FunctionState state)
throws PulsarAdminException {
try {
- RequestBuilder builder = post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString());
- builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal().writeValueAsString(state), MediaType.APPLICATION_JSON));
- org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
+ putFunctionStateAsync(tenant, namespace, function, state).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putFunctionStateAsync(String tenant, String namespace, String function, FunctionState state) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ RequestBuilder builder = post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString());
+ builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal().writeValueAsString(state), MediaType.APPLICATION_JSON));
+ asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> {
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+ } else {
+ future.complete(null);
+ }
+ });
- if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
- throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
- }
} catch (Exception e) {
- throw getApiException(e);
+ future.completeExceptionally(e);
}
+ return future;
}
}